# Import SparkSession

In [None]:
from pyspark.sql import SparkSession

# Create SparkSession 

In [3]:
spark = SparkSession.builder.master("local[1]").appName("jaraghe").getOrCreate()

SparkContext has several functions to use with RDDs. For example, it's parallelize() method is used to create an RDD from a list.

PySpark RDD (Resilient Distributed Dataset) is a fundamental data structure of PySpark that is fault-tolerant, immutable distributed collections of object. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster

In order to create an  RDD, first, you need to create a SparkSession which is an entry point to the PySpark application. SparkSession can be created using a builder() or newSession() methods of the SparkSession.

**Spark session internally creates a SparkContext variable of SparkContext. You can create multiple SparkSessio objects but only one SparkContext per JVM, In case if you want to create another new SparkContext you should stop existing Sparkcontext (using stop()) before creating a new one.**

In [20]:
rdd1 = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd2 = spark.sparkContext.parallelize(dataList)

In [21]:
rdd2.count()

3

RDD can also be created from a text file using textFile() function of the SparkContext.

In [22]:
# Create RDD from external Data source
rdd3 = spark.sparkContext.textFile("README.md")

**Any operation you perform on RDD runs in parallel.**

you can perform two kinds of operations:

RDD transformations – Transformations are lazy operations. When you run a transformation(for example update), instead of updating a current RDD, these operations return another RDD. Some transformations on RDD’s are flatMap(), map(), reduceByKey(), filter(), sortByKey().


RDD actions – operations that trigger computation and return RDD values to the driver like count(), collect(), first(), max(), reduce()

# PySpark DataFrame
you already know what Pandas DataFrame is; PySpark DataFrame is mostly similar to Pandas DataFrame with the exception PySpark DataFrames are distributed in the cluster and any operations in PySpark executes in parallel on all machines whereas Panda Dataframe stores and operates on a single machine.

In [28]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df1 = spark.createDataFrame(data=data, schema = columns)

print(df1.printSchema())
print(df1.show())

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

None
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

None


In [29]:
df2 = spark.read.csv("peyda-results.csv")
df2.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



# PySpark SQL
Once you have a DataFrame created, you can interact with the data by using SQL syntax.

In order to use SQL, first, create a temporary table on DataFrame using createOrReplaceTempView() function. Once created, this table can be accessed throughout the SparkSession using sql().

In [30]:
df1.createOrReplaceTempView("PERSON_DATA")
df3 = spark.sql("SELECT * FROM PERSON_DATA")
df3.printSchema()
df3.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [31]:
groupDF = spark.sql("SELECT gender, count(*) FROM PERSON_DATA group by gender")
groupDF.show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F|       2|
|     M|       3|
+------+--------+



# PySpark Streaming

![streaming-arch.png](images/streaming-arch.png)

In [33]:
df4 = spark.readStream.format("socket").option("host", "localhost").option("port", "9090").load()
df4.printSchema()

root
 |-- value: string (nullable = true)



After processing, you can stream the DataFrame to console. In real-time, we ideally stream it to either Kafka, database e.t.c

In [39]:
# df5 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.1.100:9092").option("subscribe", "json_topic").option("startingOffsets", "earliest").load()

In [40]:
# f.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value").writeStream.format("kafka").outputMode("append").option("kafka.bootstrap.servers", "192.168.1.100:9092").option("topic", "josn_data_topic").start().awaitTermination()