In [1]:
from pyspark.sql import SparkSession

In [20]:
# Spark Session create, you can do it using builder() or newSession()
spark = SparkSession.builder.master("local[1]").appName("Spark_Examples").getOrCreate()

In [25]:
# Using parallelize() method used to create a RDD from a list
data_list = [
    ("Java", 20000),
    ("Python", 100000),
    ("Scala", 3000),
]
rdd_parallelize = spark.sparkContext.parallelize(data_list)
rdd_parallelize

ParallelCollectionRDD[6] at readRDDFromFile at PythonRDD.scala:274

In [26]:
# Using textFile() method used to create a RDD from a external file
rdd_text_file = spark.sparkContext.textFile("./files/username.csv")
rdd_text_file

./files/username.csv MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:0

In [56]:
# Using createDataFrame()
data = [
    ('James','','Smith','1991-04-01','M',3000),
    ('Michael','Rose','','2000-05-19','M',4000),
    ('Robert','','Williams','1978-09-05','M',4000),
    ('Maria','Anne','Jones','1967-12-01','F',4000),
    ('Jen','Mary','Brown','1980-02-17','F',-1),
]

columns = [
    "firstname",
    "middlename",
    "lastname",
    "dob",
    "gender",
    "salary",
]

df = spark.createDataFrame(
    data=data,
    schema=columns
)

df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [57]:
df.toPandas()

Unnamed: 0,firstname,middlename,lastname,dob,gender,salary
0,James,,Smith,1991-04-01,M,3000
1,Michael,Rose,,2000-05-19,M,4000
2,Robert,,Williams,1978-09-05,M,4000
3,Maria,Anne,Jones,1967-12-01,F,4000
4,Jen,Mary,Brown,1980-02-17,F,-1


In [58]:
# df.show() shows the 20 elements from the DataFrame.
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



Supported file formats

DataFrame has a rich set of API which supports reading and writing several file formats
- csv
- text
- Avro
- Parquet
- tsv
- xml and many more

---
# PySpark SQL Tutorial

In [33]:
df.createOrReplaceTempView("PERSON_DATA")
df2 = spark.sql("SELECT * FROM PERSON_DATA")
df2.printSchema()
df2.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [35]:
group_df = spark.sql("SELECT gender, count(*) as qtd FROM PERSON_DATA group by gender")
group_df.show()

+------+---+
|gender|qtd|
+------+---+
|     F|  2|
|     M|  3|
+------+---+



---
# Streaming from TCP Socket

In [45]:
df = spark.readStream.format("socket").option("host", "localhost").option("port", "9090").load()

22/02/21 02:08:59 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [53]:
query = df.writeStream.format("console").outputMode("update")