In [23]:
# Import findspark to locate Spark in your Python Env
import findspark
findspark.init()
findspark.find()

# Import SparkSession

from pyspark.sql import SparkSession

# Create SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("PySpark Databricks") \
        .getOrCreate()

print("Spark Session Details:",spark)

Spark Session Details: <pyspark.sql.session.SparkSession object at 0x0000020758DDF9A0>


In [24]:
# Range toDF
my_range = spark.range(5).toDF("number")
my_range.show()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+



In [25]:
# Using Where for Evens 
evens = my_range.where("number % 2 = 0")
evens.show()

+------+
|number|
+------+
|     0|
|     2|
|     4|
+------+



In [26]:
# Read CSV with InferSchema and Headers
# Read is a Transformation. Thus, it has Lazy Evaluation

flightData2015 = spark.read \
    .option("inferschema", "true") \
    .option("header","true") \
    .csv("Data_Files\Flights.csv")

flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [27]:
# Explain - Physical Plan

flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#477 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#477 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=613]
      +- FileScan csv [DEST_COUNTRY_NAME#475,ORIGIN_COUNTRY_NAME#476,count#477] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/PRATIK/Documents/Practice/PySpark_Practice/Databricks_B..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [28]:
spark.conf.set("spark.sql.shuffle.partitions","5")      # Set to 5, Becoz Default Shuffle Partitions is 200

flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='Ireland', ORIGIN_COUNTRY_NAME='Afganistan', count=2)]

In [29]:
flightData2015.rdd.getNumPartitions()

1

In [30]:
flightData2015.createOrReplaceTempView("flights_data_2015")

# SQL Way
flights_sql_res = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flights_data_2015
GROUP BY DEST_COUNTRY_NAME""")

flights_sql_res.show()

# Dataframe Way
flights_df_res = flightData2015 \
    .groupBy("DEST_COUNTRY_NAME") \
    .count()

flights_df_res.show()

# It does not matter which way we use, final plan which Spark complies remains the same
flights_sql_res.explain()
flights_df_res.explain()

+-----------------+--------+
|DEST_COUNTRY_NAME|count(1)|
+-----------------+--------+
|   United Kingdom|       2|
|          Ireland|       1|
|          Germany|       1|
|           Russia|       1|
|    United States|       3|
|            India|       1|
+-----------------+--------+

+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|   United Kingdom|    2|
|          Ireland|    1|
|          Germany|    1|
|           Russia|    1|
|    United States|    3|
|            India|    1|
+-----------------+-----+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#475], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#475, 5), ENSURE_REQUIREMENTS, [plan_id=720]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#475], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#475] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users

In [31]:
# Max
# SQL
spark.sql(""" SELECT MAX(count) FROM flights_data_2015 """).show()

# DF
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

+----------+
|max(count)|
+----------+
|       344|
+----------+



[Row(max(count)=344)]

In [32]:
max_sql = spark.sql("""
SELECT DEST_COUNTRY_NAME, SUM(count) AS Destination_Total
FROM flights_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY SUM(count) DESC
LIMIT 5
""")

max_sql.collect()

[Row(DEST_COUNTRY_NAME='United States', Destination_Total=360),
 Row(DEST_COUNTRY_NAME='United Kingdom', Destination_Total=25),
 Row(DEST_COUNTRY_NAME='Germany', Destination_Total=10),
 Row(DEST_COUNTRY_NAME='Russia', Destination_Total=5),
 Row(DEST_COUNTRY_NAME='India', Destination_Total=5)]

In [33]:
from pyspark.sql.functions import desc

max_df = flightData2015 \
    .groupBy("DEST_COUNTRY_NAME") \
    .sum("count") \
    .withColumnRenamed("sum(count)", "destination_total") \
    .sort(desc("destination_total")) \
    .limit(5)

max_df.explain()

max_df.collect()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#565L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#475,destination_total#565L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#475], functions=[sum(count#477)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#475, 5), ENSURE_REQUIREMENTS, [plan_id=858]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#475], functions=[partial_sum(count#477)])
            +- FileScan csv [DEST_COUNTRY_NAME#475,count#477] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/PRATIK/Documents/Practice/PySpark_Practice/Databricks_B..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




[Row(DEST_COUNTRY_NAME='United States', destination_total=360),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=25),
 Row(DEST_COUNTRY_NAME='Germany', destination_total=10),
 Row(DEST_COUNTRY_NAME='Russia', destination_total=5),
 Row(DEST_COUNTRY_NAME='India', destination_total=5)]

In [34]:
#  Spark Streaming Example

staticDataFrame = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("Data_Files/Retail.csv")

staticDataFrame.createOrReplaceTempView("retail_data")

staticSchema = staticDataFrame.schema
staticDataFrame.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [40]:
spark.sql("""
SELECT DISTINCT DATE(InvoiceDate), COUNT(*)
FROM retail_data
GROUP BY 1
ORDER BY 2 DESC
""").show()

+-----------+--------+
|InvoiceDate|count(1)|
+-----------+--------+
| 2010-12-01|       6|
| 2010-12-03|       2|
| 2010-12-05|       1|
| 2010-12-02|       1|
+-----------+--------+



In [50]:
# Windowing per day based on a Business Column

from pyspark.sql.functions import window, column, col, desc

staticDataFrame \
.selectExpr(
    "CustomerId",
    "(UnitPrice*Quantity) as total_cost",
    "InvoiceDate")\
.groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(truncate=False)

+----------+------------------------------------------+------------------+
|CustomerId|window                                    |sum(total_cost)   |
+----------+------------------------------------------+------------------+
|17850.0   |{2010-12-01 05:30:00, 2010-12-02 05:30:00}|112.92000000000002|
|17850.0   |{2010-12-03 05:30:00, 2010-12-04 05:30:00}|15.3              |
|17850.0   |{2010-12-05 05:30:00, 2010-12-06 05:30:00}|11.100000000000001|
|13047.0   |{2010-12-03 05:30:00, 2010-12-04 05:30:00}|54.08             |
|17850.0   |{2010-12-02 05:30:00, 2010-12-03 05:30:00}|22.0              |
+----------+------------------------------------------+------------------+



In [None]:
# Converting above Batch logic to Stream
# Won't Run, but keeping the Syntax for References
"""
# Reading Stream
streamingDataFrame = spark.readStream\
 .schema(staticSchema)\     # Providing Schema of Streaming Data
 .option("maxFilesPerTrigger", 1)\
 .format("csv")\
 .option("header", "true")\
 .load(".../*.csv")

streamingDataFrame.isStreaming      # returns true

purchaseByCustomerPerHour = streamingDataFrame\
 .selectExpr(
 "CustomerId",
 "(UnitPrice * Quantity) as total_cost" ,
 "InvoiceDate" )\
 .groupBy(
 col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
 .sum("total_cost")

# But to Run, we have to call an Action
# Streaming has different actions
purchaseByCustomerPerHour.writeStream\
 .format("memory")\     #  memory = store in-memory table | For writing to Console >> .format("console")
 .queryName("customer_purchases")\      # customer_purchases = name of the in-memory table
 .outputMode("complete")\       #  complete = all the counts should be in the table
 .start()
"""

