Import pyspark libray

In [27]:
import pyspark, spark
from pyspark.sql import SparkSession

Create Spark Session

In [28]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Flight Data Analysis") \
    .getOrCreate()

Creating dataframe with Range of Numbers

In [29]:
myrange = spark.range(100).toDF('number')
myrange.show(5)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+
only showing top 5 rows



Synatax to read a csv file

In [30]:

# Now use the SparkSession object for reading the CSV
flightData2015 = spark \
    .read \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .csv("Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")

# Display the dataframe
flightData2015.show(5)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



A Dataframe is an array of Rows

In [31]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

Explain Plan in Spark

In [32]:
flightData2015.sort('count').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#153 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#153 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=233]
      +- FileScan csv [DEST_COUNTRY_NAME#151,ORIGIN_COUNTRY_NAME#152,count#153] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/e:/Data Engineering/Spark-Practice/Spark-The-Definitive-Guide/da..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




Configuring partitions count

In [33]:
spark.conf.set("spark.sql.shuffle.partitons", "500") # By default it is 200

In [34]:
flightData2015.sort('count')

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: int]

Create Table View in spark

In [35]:
flightData2015.createOrReplaceTempView("flight_Data_2015")

Spark.SQL

In [36]:
sqlWay = spark.sql(''' 
          select DEST_COUNTRY_NAME , count(1) 
          from flight_Data_2015 
          group by DEST_COUNTRY_NAME''')
sqlWay.show(5)

+-----------------+--------+
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+
|         Anguilla|       1|
|           Russia|       1|
|         Paraguay|       1|
|          Senegal|       1|
|           Sweden|       1|
+-----------------+--------+
only showing top 5 rows



In [37]:
# Top 5 Destination Country
sql2 = spark.sql(""" 
                 select DEST_COUNTRY_NAME, SUM(count) as Total_Trips 
                 from flight_Data_2015
                 group by DEST_COUNTRY_NAME
                 order by Total_Trips desc 
                 limit 5 """)
sql2.show()

+-----------------+-----------+
|DEST_COUNTRY_NAME|Total_Trips|
+-----------------+-----------+
|    United States|     411352|
|           Canada|       8399|
|           Mexico|       7140|
|   United Kingdom|       2025|
|            Japan|       1548|
+-----------------+-----------+



Case Sensitivity 

In [38]:
spark.conf.set("caseSensitive",False) #Default is False

Creating Rows

In [39]:
from pyspark.sql import Row
myRow = Row("Hello",None,1,False)

Random Samples

In [54]:
seed = 5
withReplacement = False
fraction=0.5
flightData2015.sample(withReplacement, fraction,seed).count()

138

In [56]:
flightData2015.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



Random splits

In [57]:
dataFrames = flightData2015.randomSplit([0.25,0.75], seed)
dataFrames[0].count() , dataFrames[1].count()

(71, 185)

Repartition and Coalesce

In [58]:
df = spark \
    .read \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .csv("Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")

In [61]:
df.rdd.getNumPartitions()

1

In [78]:
from pyspark.sql.functions import col
df.select(col('DEST_COUNTRY_NAME')).distinct().count()

132

In [79]:
partitionedDf = df.repartition(10,'DEST_COUNTRY_NAME')

In [80]:
partitionedDf.rdd.getNumPartitions()

10

In [81]:
coalesceDf = partitionedDf.coalesce(5)

In [83]:
coalesceDf.rdd.getNumPartitions()

5