# SPARK hands-on exercises and examples

In [1]:
import findspark

In [2]:
findspark.init('/home/agharamudhalvi/spark-3.3.0-bin-hadoop3')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
import pandas as pd

In [6]:
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.appName("Assignment").getOrCreate()

23/01/30 20:09:18 WARN Utils: Your hostname, TIGER02310 resolves to a loopback address: 127.0.1.1; using 172.18.176.1 instead (on interface eth1)
23/01/30 20:09:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/30 20:09:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# DAY 1

# WHAT IS BIGDATA?



Apache Spark is an open-source, distributed computing framework designed for large-scale data processing. In the context of big data, Spark is used for big data processing and analysis by enabling in-memory computing, which provides faster processing speed compared to traditional disk-based systems. Spark can handle big data from various sources and formats, including structured, semi-structured, and unstructured data, making it a popular tool for big data processing and analysis.



# WHY SPARK?



Spark is widely adopted due to several key benefits, including:

Speed: Spark supports in-memory computing, which allows for faster processing of large data sets compared to traditional disk-based systems.

Ease of Use: Spark has a high-level API in multiple programming languages, making it easy to develop and maintain applications.

Scalability: Spark can be easily scaled up or down as needed, allowing for dynamic resource allocation and efficient use of hardware.

Fault Tolerance: Spark automatically recovers lost data and re-executes failed operations, ensuring high availability and reliability.

Support for Multiple Data Sources and Formats: Spark supports various data sources and formats, including structured, semi-structured, and unstructured data.

Integrated with Other Big Data Tools: Spark integrates with other big data tools, such as Hadoop, and can run on top of existing Hadoop clusters.

Overall, Spark provides a fast, easy-to-use, and flexible platform for big data processing and analysis.
Spark does just that, managing and coordinating the execution of tasks on data across a cluster of computers.







# WHAT IS SPARK?



Apache Spark is an open-source, distributed computing framework designed for large-scale data processing. It was developed to address the limitations of Hadoop MapReduce and provide a faster and more flexible solution for big data processing. Spark supports in-memory computing and offers APIs in multiple programming languages, making it easier to develop and maintain big data applications. Spark can process various types of data, including structured, semi-structured, and unstructured data, and integrates with other big data tools such as Hadoop and Apache Cassandra. Its advanced features and performance make Spark a popular choice for big data processing and analysis.


# INTERNALS OF SPARK?



Resilient Distributed Datasets (RDD): Spark's fundamental data structure that allows it to distribute and parallelize data across multiple nodes in a cluster.

Cluster Manager: Spark can run on a standalone cluster or on top of an existing cluster manager like Apache Mesos, Hadoop YARN or Kubernetes.

Task Scheduler: The task scheduler is responsible for distributing tasks across the nodes in a cluster.

Shuffle: The process of redistributing data across nodes for aggregation or joining purposes.

DAG (Directed Acyclic Graph) Scheduler: The DAG scheduler builds a directed acyclic graph from RDD operations to optimize the task execution.

Executors: Executors run on worker nodes and execute tasks assigned by the task scheduler.

Driver Program: The driver program controls the execution of a Spark application and communicates with the executors.

Caching: Spark can cache data in memory for faster access, reducing the need to recompute expensive operations.






# Highlevel API of spark? 



Sparksession
Dataframe
Partitions
Transformation
Actions
Lazy Evaluation

# Sparksession

In [7]:
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.appName("Assignment").getOrCreate()

In [8]:
spark

In [9]:
myRange = spark.range(1000).toDF("number")

There is a SparkSession object available to the user, which is the entrance point to running Spark code.
SparkSession is the entry point to programming Spark with the Dataset and DataFrame API. It is used for creating a Spark application, reading data and executing SQL queries. It provides a single point of access to all the Spark functionality, including creating RDDs, accumulators and broadcast variables, and starting new Spark jobs.

# Dataframe

A DataFrame in Spark is a distributed collection of data organized into named columns. It is similar to a table in a relational database or a data frame in R or Python. Spark DataFrames provide a rich API for operating on the data and are optimized for big data processing. They support operations such as filtering, aggregation, and transformation and can be easily converted to RDDs, SQL tables, and other data structures for further processing. DataFrames also allow for interoperation with Spark SQL and can be saved to various storage systems like HDFS, S3, and others.

# Partitions

In Spark, a partition is a unit of parallelism. It is a logical division of data that is processed in parallel across a cluster of nodes. Each partition contains a portion of the data and operates independently of other partitions.

Spark partitions the data in RDDs and DataFrames to allow for parallel processing. The number of partitions can be specified while creating RDDs or DataFrames, or it can be determined dynamically by Spark based on the cluster configuration. A larger number of partitions can lead to higher parallelism, but also increases overhead for task scheduling and coordination. On the other hand, a smaller number of partitions can limit parallelism and impact the performance of processing large datasets. Proper partitioning of data is important for optimizing Spark performance.

# Transformation

In Spark, transformations are operations performed on RDDs or DataFrames that produce a new RDD or DataFrame as a result. Transformations are executed lazily, which means they are not executed immediately when they are called, but only when an action is performed on the RDD or DataFrame. The following are some of the commonly used transformations in Spark:

map: applies a function to each element in the RDD or DataFrame and returns a new RDD or DataFrame with the results.

filter: returns a new RDD or DataFrame containing only the elements that satisfy a given condition.

flatMap: is similar to map, but each input item can be mapped to zero or more output items.

reduceByKey: aggregates the values of each key in an RDD of key-value pairs, using a user-defined reduce function.

groupByKey: groups the values of each key in an RDD of key-value pairs.

join: combines two RDDs or DataFrames based on a common key.

union: combines two RDDs or DataFrames into a single RDD or DataFrame by appending their elements.

These transformations allow for the creation of complex processing pipelines, making it possible to process large amounts of data in a distributed and scalable manner.







In [10]:
divisBy2 = myRange.where("number % 2 = 0")

# Actions

In Spark, actions are operations that return a result or trigger the computation of a RDD or DataFrame. They are the operations that materialize the results of a series of transformations and bring back data to the driver program. The following are some of the commonly used actions in Spark:

count: returns the number of elements in an RDD or DataFrame.

first: returns the first n elements of an RDD or DataFrame.

take: returns the first n elements of an RDD or DataFrame as an array.

reduce: aggregates the elements of an RDD or DataFrame using a user-defined function.

collect: returns all the elements of an RDD or DataFrame to the driver program as an array.

saveAsTextFile: writes the elements of an RDD or DataFrame to a text file.

saveAsParquetFile: writes the elements of an RDD or DataFrame to a Parquet file.

show: displays the first n elements of an RDD or DataFrame in a tabular format.

These actions trigger the computation of all the transformations in the processing pipeline and produce the final result of a Spark application. They are crucial for retrieving the results of a Spark computation and making them available for further analysis or storage.






In [11]:
divisBy2.count()


                                                                                

500

# Lazy Evaluation

Lazy evaluation is a evaluation strategy used in Spark where transformations are not executed immediately when they are called, but instead, the transformations are recorded and their execution is delayed until an action is performed.

The main advantage of lazy evaluation is that it allows Spark to optimize the execution plan and avoid unnecessary computation. By deferring the computation, Spark can analyze the entire sequence of transformations and determine the most efficient way to execute them. This can lead to significant performance improvements, especially when dealing with large datasets.

Lazy evaluation also allows Spark to cache intermediate results, which can be reused if the same transformations are performed multiple times. This avoids recomputing the intermediate results, which can save significant time and resources.

In summary, lazy evaluation is a key feature of Spark that enables efficient processing of big data and improves the overall performance of Spark applications.






In [12]:
from pyspark.shell import spark

23/01/30 20:09:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/

Using Python version 3.9.13 (main, May 23 2022 22:01:06)
Spark context Web UI available at http://172.18.176.1:4040
Spark context available as 'sc' (master = local[*], app id = local-1675089578491).
SparkSession available as 'spark'.


In [13]:
pyspark


<module 'pyspark' from '/home/agharamudhalvi/spark-3.3.0-bin-hadoop3/python/pyspark/__init__.py'>

In [14]:
from pyspark.shell import spark

In [16]:
from pyspark import SparkContext,SQLContext,SparkConf,StorageLevel

# Spark UI

You can monitor the progress of a job through the Spark web UI. The Spark UI is
available on port 4040 of the driver node. If you are running in local mode, this will
be http://localhost:4040.

# END TO END EXAMPLES

In [17]:
flightData2015 = spark\
 .read\
 .option("inferSchema", "true")\
 .option("header", "true")\
 .csv("data/flight-data/csv/2015-summary.csv")

In [18]:
flightData2015 = spark\
 .read\
 .option("inferSchema", "true")\
 .option("header", "true")\
 .csv("data/flight-data/csv/2015-summary.csv")


In [19]:
flightData2015.take(3)


[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [20]:
flightData2015.sort("count").explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#52 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#52 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#90]
      +- FileScan csv [DEST_COUNTRY_NAME#50,ORIGIN_COUNTRY_NAME#51,count#52] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/agharamudhalvi/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [21]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [22]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [23]:
spark.conf.set("spark.sql.shuffle.partitions", "6")

In [24]:
flightData2015.sort("count").take(3)

[Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

In [25]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [26]:
flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

# DataFrames and SQL

In [27]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [28]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flightData2015\
 .groupBy("DEST_COUNTRY_NAME")\
 .count()
sqlWay.explain()
dataFrameWay.explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#50], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#50, 2), ENSURE_REQUIREMENTS, [id=#124]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#50], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#50] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/agharamudhalvi/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#50], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#50, 2), ENSURE_REQUIREMENTS, [id=#137]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#50], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#50] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 p

In [29]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)


[Row(max(count)=370002)]

In [30]:
from pyspark.sql.functions import max

In [31]:
flightData2015.select(max("count")).take(1)


[Row(max(count)=370002)]

In [32]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")


In [33]:
maxSql.show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [34]:
from pyspark.sql.functions import desc

In [35]:
flightData2015\
 .groupBy("DEST_COUNTRY_NAME")\
 .sum("count")\
 .withColumnRenamed("sum(count)", "destination_total")\
 .sort(desc("destination_total"))\
 .limit(5)\
 .show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [36]:
flightData2015\
 .groupBy("DEST_COUNTRY_NAME")\
 .sum("count")\
 .withColumnRenamed("sum(count)", "destination_total")\
 .sort(desc("destination_total"))\
 .limit(5)\
 .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#152L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#50,destination_total#152L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#50], functions=[sum(count#52)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#50, 2), ENSURE_REQUIREMENTS, [id=#337]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#50], functions=[partial_sum(count#52)])
            +- FileScan csv [DEST_COUNTRY_NAME#50,count#52] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/agharamudhalvi/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




# Structured Streaming

Structured Streaming is a high-level API for stream processing that became
production-ready in Spark 2.2. With Structured Streaming, you can take the same
operations that you perform in batch mode using Spark’s structured APIs and run
them in a streaming fashion. This can reduce latency and allow for incremental pro‐
cessing. The best thing about Structured Streaming is that it allows you to rapidly and
quickly extract value out of streaming systems with virtually no code changes. It also
makes it easy to conceptualize because you can write your batch job as a way to pro‐
totype it and then you can convert it to a streaming job. The way all of this works is
by incrementally processing that data.

In [37]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("data/retail-data/by-day/*.csv")



                                                                                

In [38]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [39]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .sort(desc("sum(total_cost)"))\
  .show(5)



+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|{2011-09-20 05:30...|          71601.44|
|      null|{2011-11-14 05:30...|          55316.08|
|      null|{2011-11-07 05:30...|          42939.17|
|      null|{2011-03-29 05:30...| 33521.39999999998|
|      null|{2011-12-08 05:30...|31975.590000000007|
+----------+--------------------+------------------+
only showing top 5 rows



                                                                                

By default, the value is 200, but because there aren’t many executors on this machine,
it’s worth reducing this to 5

In [40]:
spark.conf.set("spark.sql.shuffle.partitions", "5")


In [41]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("/data/retail-data/by-day/*.csv")

whether our DataFrame is streaming

In [42]:
streamingDataFrame.isStreaming

True

In [43]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")


In [44]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

23/01/30 20:10:54 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-8fa9e044-7e60-40c4-8495-8d9ebef4eeb8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/01/30 20:10:54 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f882bc0a340>

In [45]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .show(5)

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



Machine Learning and Advanced Analytics

In [48]:
staticDataFrame.printSchema()


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [46]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
  .na.fill(0)\
  .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
  .coalesce(5)

In [49]:
trainDataFrame = preppedDataFrame\
  .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
  .where("InvoiceDate >= '2011-07-01'")

In [50]:
trainDataFrame.count()
testDataFrame.count()

                                                                                

296006

In [51]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")

In [52]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [53]:

from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
  .setOutputCol("features")


In [54]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [55]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

                                                                                

In [56]:
transformedTraining = fittedPipeline.transform(trainDataFrame)


In [57]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
 .setK(20)\
 .setSeed(1)

In [58]:
kmModel = kmeans.fit(transformedTraining)

                                                                                

In [59]:
transformedTest = fittedPipeline.transform(testDataFrame)

# Lower-Level APIs

In [63]:
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

DataFrame[_1: bigint]

# Overview of Structured Spark Types


For example, the following code does not perform addition in Scala
or Python; it actually performs addition purely in Spark:


In [67]:
df = spark.range(500).toDF("number")
df.select(df["number"] + 10)

DataFrame[(number + 10): bigint]

# Rows


A row is nothing more than a record of data. Each record in a DataFrame must be of
type Row, as we can see when we collect the following DataFrames.

In [68]:
spark.range(2).collect()

[Row(id=0), Row(id=1)]

# Spark Types

In [69]:
from pyspark.sql.types import *
b = ByteType()


# Day 2(chapter 4,5)

# 2. Basic Structured Operation

# 2.1 Schema

In [70]:
df = spark.read.format("json")\
.load("data/flight-data/json/2015-summary.json")


In [71]:
spark.read.format("json")\
.load("data/flight-data/json/2015-summary.json").schema


StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

In [73]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema)\
  .load("data/flight-data/json/2015-summary.json")
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



# 2.2 Columns and Expressions

In [74]:
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")

Column<'someColumnName'>

In [76]:
df.select(col("count")).show()

+-----+
|count|
+-----+
|   15|
|    1|
|  344|
|   15|
|   62|
|    1|
|   62|
|  588|
|   40|
|    1|
|  325|
|   39|
|   64|
|    1|
|   41|
|   30|
|    6|
|    4|
|  230|
|    1|
+-----+
only showing top 20 rows



In [77]:
from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) < otherCol")


Column<'((((someCol + 5) * 200) - 6) < otherCol)'>

Accessing a DataFrame’s columns

In [83]:
spark.read.format("json").load("data/flight-data/json/2015-summary.json")\
 .columns



['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

# 2.3 Records and Rows

In [84]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

# 2.4 Create Rows

In [85]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

In [87]:
myRow[0]


'Hello'

In [88]:
myRow[2]

1

# 2.6 Creating Dataframes

In [89]:
df = spark.read.format("json").load("data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")


In [90]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()


+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



# 2.7 select and selectExpr

In [92]:
df.select("DEST_COUNTRY_NAME").show(2)


+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [94]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)


+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [93]:

from pyspark.sql.functions import expr, col, column
df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
  .show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [95]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)



+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [96]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))\
  .show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [97]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)



+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [98]:

df.selectExpr(
  "*", # all original columns
  "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
  .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [99]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [100]:
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)


+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



# 2.8 Adding columns

In [101]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [102]:

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
  .show(2)


+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



# 2.9 Renaming columns

In [103]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns


['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [104]:
dfWithLongColName = df.withColumn(
    "This Long Column-Name",
    expr("ORIGIN_COUNTRY_NAME"))

In [105]:
dfWithLongColName.show(2)

+-----------------+-------------------+-----+---------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-----------------+-------------------+-----+---------------------+
|    United States|            Romania|   15|              Romania|
|    United States|            Croatia|    1|              Croatia|
+-----------------+-------------------+-----+---------------------+
only showing top 2 rows



In [106]:
dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`")\
  .show(2)

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



In [107]:
dfWithLongColName.select(expr("`This Long Column-Name`")).columns

['This Long Column-Name']

In [108]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [109]:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

DataFrame[count: bigint, This Long Column-Name: string]

# 2.10 Changing column's type

In [110]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [111]:
df.withColumn("count",col("count").cast("int")).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



# 2.11 Filtering rows

In [112]:
colCondition = df.filter(col("count") < 2).take(2)
conditional = df.where("count < 2").take(2)

In [113]:
print(colCondition)
print(conditional)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]


In [114]:
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
  .show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



# 2.12 Getting unique rows

In [115]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()


256

In [116]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

125

# 2.13 Sorting

In [117]:
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--

In [118]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(desc(col("count")), asc(col("DEST_COUNTRY_NAME"))).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



In [121]:
spark.read.format("json").load("data/flight-data/json/*-summary.json")\
  .sortWithinPartitions("count")

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

# 2.14 Limit

In [119]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [120]:
df.orderBy(expr("count desc")).limit(6).show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
+--------------------+-------------------+-----+



# 2.15 Random Samples


In [125]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

138

# 2.16 Random Splits

In [126]:
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() 

False

# 2.17 Concatenating and Appending Rows (Union)

In [127]:
from pyspark.sql import Row
schema = df.schema
newRows = [
  Row("New Country", "Other Country", 5),
  Row("New Country 2", "Other Country 3", 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)


In [128]:

df.union(newDF)\
  .where("count = 1")\
  .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
  .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



# 2.18 Repartition and Coalesce

In [129]:
df.rdd.getNumPartitions()

1

In [130]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [131]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [132]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [133]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

# Collecting Rows to the Driver

Spark maintains the state of the cluster in the
driver. There are times when you’ll want to collect some of your data to the driver in
order to manipulate it on your local machine.

In [134]:

collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India         

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [135]:
collectDF.toLocalIterator()

<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f880f84fdd0>

# Working with Different Types of Data

In [136]:
df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



# Converting to Spark Types


In [137]:
from pyspark.sql.functions import lit
df.select(lit(5), lit("five"), lit(5.0))

DataFrame[5: int, five: string, 5.0: double]

# Working with Booleans

In [138]:
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
  .select("InvoiceNo", "Description")\
  .show(5, False)

+---------+-----------------------------+
|InvoiceNo|Description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [139]:
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()


+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [140]:
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
  .where("isExpensive")\
  .select("unitPrice", "isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [141]:
from pyspark.sql.functions import expr
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))\
  .where("isExpensive")\
  .select("Description", "UnitPrice").show(5)

+--------------+---------+
|   Description|UnitPrice|
+--------------+---------+
|DOTCOM POSTAGE|   569.77|
|DOTCOM POSTAGE|   607.49|
+--------------+---------+



null-safe equivalence test:

In [144]:
df.where(col("Description").eqNullSafe("hello")).show()


+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
+---------+---------+-----------+--------+-----------+---------+----------+-------+



# Working with Numbers

In [142]:
from pyspark.sql.functions import expr, pow
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [145]:
df.selectExpr(
  "CustomerId",
  "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)


+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [146]:
from pyspark.sql.functions import lit, round, bround

df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
|          3.0|           2.0|
+-------------+--------------+
only showing top 2 rows



In [147]:
from pyspark.sql.functions import corr
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()

+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+



In [148]:
df.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [149]:

from pyspark.sql.functions import count, mean, stddev_pop, min, max

In [150]:
colName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile("UnitPrice", quantileProbs, relError) 

[2.51]

In [151]:
df.stat.crosstab("StockCode", "Quantity").show()

23/01/30 21:31:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|StockCode_Quantity| -1|-10|-12| -2|-24| -3| -4| -5| -6| -7|  1| 10|100| 11| 12|120|128| 13| 14|144| 15| 16| 17| 18| 19|192|  2| 20|200| 21|216| 22| 23| 24| 25|252| 27| 28|288|  3| 30| 32| 33| 34| 36|384|  4| 40|432| 47| 48|480|  5| 50| 56|  6| 60|600| 64|  7| 70| 72|  8| 80|  9| 96|
+------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--

In [152]:
df.stat.freqItems(["StockCode", "Quantity"]).show()


+--------------------+--------------------+
| StockCode_freqItems|  Quantity_freqItems|
+--------------------+--------------------+
|[90214E, 20728, 2...|[200, 128, 23, 32...|
+--------------------+--------------------+



In [153]:
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)


+-----------------------------+
|monotonically_increasing_id()|
+-----------------------------+
|                            0|
|                            1|
+-----------------------------+
only showing top 2 rows



# Working with String

In [154]:

from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show()

+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
|Cream Cupid Heart...|
|Knitted Union Fla...|
|Red Woolly Hottie...|
|Set 7 Babushka Ne...|
|Glass Star Froste...|
|Hand Warmer Union...|
|Hand Warmer Red P...|
|Assorted Colour B...|
|Poppy's Playhouse...|
|Poppy's Playhouse...|
|Feltcraft Princes...|
|Ivory Knitted Mug...|
|Box Of 6 Assorted...|
|Box Of Vintage Ji...|
|Box Of Vintage Al...|
|Home Building Blo...|
|Love Building Blo...|
|Recipe Box With M...|
+--------------------+
only showing top 20 rows



In [155]:
from pyspark.sql.functions import lower, upper
df.select(col("Description"),
    lower(col("Description")),
    upper(lower(col("Description")))).show(2)

+--------------------+--------------------+-------------------------+
|         Description|  lower(Description)|upper(lower(Description))|
+--------------------+--------------------+-------------------------+
|WHITE HANGING HEA...|white hanging hea...|     WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern|      WHITE METAL LANTERN|
+--------------------+--------------------+-------------------------+
only showing top 2 rows



In [156]:

from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
    ltrim(lit("    HELLO    ")).alias("ltrim"),
    rtrim(lit("    HELLO    ")).alias("rtrim"),
    trim(lit("    HELLO    ")).alias("trim"),
    lpad(lit("HELLO"), 3, " ").alias("lp"),
    rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

+---------+---------+-----+---+----------+
|    ltrim|    rtrim| trim| lp|        rp|
+---------+---------+-----+---+----------+
|HELLO    |    HELLO|HELLO|HEL|HELLO     |
|HELLO    |    HELLO|HELLO|HEL|HELLO     |
+---------+---------+-----+---+----------+
only showing top 2 rows



# Regular Expressions

In [157]:
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(
  regexp_replace(col("Description"), regex_string, "COLOR").alias("color_clean"),
  col("Description")).show(2)

+--------------------+--------------------+
|         color_clean|         Description|
+--------------------+--------------------+
|COLOR HANGING HEA...|WHITE HANGING HEA...|
| COLOR METAL LANTERN| WHITE METAL LANTERN|
+--------------------+--------------------+
only showing top 2 rows



In [158]:
from pyspark.sql.functions import translate
df.select(translate(col("Description"), "LEET", "1337"),col("Description"))\
  .show(2)

+----------------------------------+--------------------+
|translate(Description, LEET, 1337)|         Description|
+----------------------------------+--------------------+
|              WHI73 HANGING H3A...|WHITE HANGING HEA...|
|               WHI73 M37A1 1AN73RN| WHITE METAL LANTERN|
+----------------------------------+--------------------+
only showing top 2 rows



In [159]:

from pyspark.sql.functions import regexp_extract
extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
df.select(
     regexp_extract(col("Description"), extract_str, 1).alias("color_clean"),
     col("Description")).show(2)


+-----------+--------------------+
|color_clean|         Description|
+-----------+--------------------+
|      WHITE|WHITE HANGING HEA...|
|      WHITE| WHITE METAL LANTERN|
+-----------+--------------------+
only showing top 2 rows



In [160]:
from pyspark.sql.functions import instr
containsBlack = instr(col("Description"), "BLACK") >= 1
containsWhite = instr(col("Description"), "WHITE") >= 1
df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
  .where("hasSimpleColor")\
  .select("Description").show(3, False)


+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|RED WOOLLY HOTTIE WHITE HEART.    |
+----------------------------------+
only showing top 3 rows



In [162]:
from pyspark.sql.functions import expr, locate
simpleColors = ["black", "white", "red", "green", "blue"]
def color_locator(column, color_string):
  return locate(color_string.upper(), column)\
          .cast("boolean")\
          .alias("is_" + color_string)
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr("*")) # has to a be Column type

df.select(*selectedColumns).where(expr("is_white OR is_red"))\
  .select("Description").show(3, False)

+----------------------------------+
|Description                       |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN               |
|RED WOOLLY HOTTIE WHITE HEART.    |
+----------------------------------+
only showing top 3 rows



# Working with Dates and Timestamps


In [163]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [164]:
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
  .withColumn("today", current_date())\
  .withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")

In [165]:

from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2023-01-25|        2023-02-04|
+------------------+------------------+
only showing top 1 row



In [166]:
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
  .select(datediff(col("week_ago"), col("today"))).show(1)

dateDF.select(
    to_date(lit("2016-01-01")).alias("start"),
    to_date(lit("2017-05-22")).alias("end"))\
  .select(months_between(col("start"), col("end"))).show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+
only showing top 1 row



In [167]:

from pyspark.sql.functions import to_date, lit
spark.range(5).withColumn("date", lit("2017-01-01"))\
  .select(to_date(col("date"))).show(1)


+-------------+
|to_date(date)|
+-------------+
|   2017-01-01|
+-------------+
only showing top 1 row



In [168]:
from pyspark.sql.functions import to_date
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(
    to_date(lit("2017-12-11"), dateFormat).alias("date"),
    to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dateTable2")

In [169]:
from pyspark.sql.functions import to_timestamp
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()


+------------------------------+
|to_timestamp(date, yyyy-dd-MM)|
+------------------------------+
|           2017-11-12 00:00:00|
+------------------------------+



# Working with Nulls in Data

# Coalesce

Spark includes a function to allow you to select the first non-null value from a set of
columns by using the coalesce function. 

In [171]:
from pyspark.sql.functions import coalesce
df.select(coalesce(col("Description"), col("CustomerId"))).show()

+---------------------------------+
|coalesce(Description, CustomerId)|
+---------------------------------+
|             WHITE HANGING HEA...|
|              WHITE METAL LANTERN|
|             CREAM CUPID HEART...|
|             KNITTED UNION FLA...|
|             RED WOOLLY HOTTIE...|
|             SET 7 BABUSHKA NE...|
|             GLASS STAR FROSTE...|
|             HAND WARMER UNION...|
|             HAND WARMER RED P...|
|             ASSORTED COLOUR B...|
|             POPPY'S PLAYHOUSE...|
|             POPPY'S PLAYHOUSE...|
|             FELTCRAFT PRINCES...|
|             IVORY KNITTED MUG...|
|             BOX OF 6 ASSORTED...|
|             BOX OF VINTAGE JI...|
|             BOX OF VINTAGE AL...|
|             HOME BUILDING BLO...|
|             LOVE BUILDING BLO...|
|             RECIPE BOX WITH M...|
+---------------------------------+
only showing top 20 rows



# ifnull, nullIf, nvl, and nvl2

There are several other SQL functions that you can use to achieve similar things.
ifnull allows you to select the second value if the first is null, and defaults to the first. Alternatively, you could use nullif, which returns null if the two values are
equal or else returns the second if they are not. nvl returns the second value if the
first is null, but defaults to the first. Finally, nvl2 returns the second value if the first is
not null; otherwise, it will return the last specified value 

# drop

In [172]:
df.na.drop("all", subset=["StockCode", "InvoiceNo"])

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

# fill

In [173]:
df.na.fill("all", subset=["StockCode", "InvoiceNo"])


DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [174]:
fill_cols_vals = {"StockCode": 5, "Description" : "No Value"}
df.na.fill(fill_cols_vals)

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

# replace

In [175]:
df.na.replace([""], ["UNKNOWN"], "Description")

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

# Working with Complex Types

# Structs


In [176]:
df.selectExpr("(Description, InvoiceNo) as complex", "*")
df.selectExpr("struct(Description, InvoiceNo) as complex", "*")

DataFrame[complex: struct<Description:string,InvoiceNo:string>, InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [177]:
from pyspark.sql.functions import struct
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")



# Arrays

# split


In [178]:
from pyspark.sql.functions import split
df.select(split(col("Description"), " ")).show(2)

+-------------------------+
|split(Description,  , -1)|
+-------------------------+
|     [WHITE, HANGING, ...|
|     [WHITE, METAL, LA...|
+-------------------------+
only showing top 2 rows



In [179]:
df.select(split(col("Description"), " ").alias("array_col"))\
  .selectExpr("array_col[0]").show(2)


+------------+
|array_col[0]|
+------------+
|       WHITE|
|       WHITE|
+------------+
only showing top 2 rows



# Array Length

In [180]:
from pyspark.sql.functions import size
df.select(size(split(col("Description"), " "))).show(2) 

+-------------------------------+
|size(split(Description,  , -1))|
+-------------------------------+
|                              5|
|                              3|
+-------------------------------+
only showing top 2 rows



# array_contains

In [181]:
from pyspark.sql.functions import array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)

+------------------------------------------------+
|array_contains(split(Description,  , -1), WHITE)|
+------------------------------------------------+
|                                            true|
|                                            true|
+------------------------------------------------+
only showing top 2 rows



# explode

In [182]:
from pyspark.sql.functions import split, explode

df.withColumn("splitted", split(col("Description"), " "))\
  .withColumn("exploded", explode(col("splitted")))\
  .select("Description", "InvoiceNo", "exploded").show(2)

+--------------------+---------+--------+
|         Description|InvoiceNo|exploded|
+--------------------+---------+--------+
|WHITE HANGING HEA...|   536365|   WHITE|
|WHITE HANGING HEA...|   536365| HANGING|
+--------------------+---------+--------+
only showing top 2 rows



# Maps

In [183]:
from pyspark.sql.functions import create_map

df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
  .show(2)

+--------------------+
|         complex_map|
+--------------------+
|{WHITE HANGING HE...|
|{WHITE METAL LANT...|
+--------------------+
only showing top 2 rows



In [184]:

df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
  .selectExpr("complex_map['WHITE METAL LANTERN']").show(2)

+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
|                            null|
|                          536365|
+--------------------------------+
only showing top 2 rows



In [185]:
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
  .selectExpr("explode(complex_map)").show(2)

+--------------------+------+
|                 key| value|
+--------------------+------+
|WHITE HANGING HEA...|536365|
| WHITE METAL LANTERN|536365|
+--------------------+------+
only showing top 2 rows



# Working with JSON

In [186]:
jsonDF = spark.range(1).selectExpr("""
  '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")

In [187]:
from pyspark.sql.functions import get_json_object, json_tuple

jsonDF.select(
    get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]"),
    json_tuple(col("jsonString"), "myJSONKey")).show(2)

+-------------------------------------------------------+--------------------+
|get_json_object(jsonString, $.myJSONKey.myJSONValue[1])|                  c0|
+-------------------------------------------------------+--------------------+
|                                                      2|{"myJSONValue":[1...|
+-------------------------------------------------------+--------------------+



In [188]:
from pyspark.sql.functions import to_json
df.selectExpr("(InvoiceNo, Description) as myStruct")\
  .select(to_json(col("myStruct")))

DataFrame[to_json(myStruct): string]

In [189]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
parseSchema = StructType((
  StructField("InvoiceNo",StringType(),True),
  StructField("Description",StringType(),True)))
df.selectExpr("(InvoiceNo, Description) as myStruct")\
  .select(to_json(col("myStruct")).alias("newJSON"))\
  .select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)

+--------------------+--------------------+
|  from_json(newJSON)|             newJSON|
+--------------------+--------------------+
|{536365, WHITE HA...|{"InvoiceNo":"536...|
|{536365, WHITE ME...|{"InvoiceNo":"536...|
+--------------------+--------------------+
only showing top 2 rows



# User-Defined Functions

The first step is the actual function.

In [190]:
udfExampleDF = spark.range(5).toDF("num")
def power3(double_value):
  return double_value ** 3
power3(2.0)

8.0

Register the function to make it available as a DataFrame function

In [192]:
from pyspark.sql.functions import udf
power3udf = udf(power3)

In [193]:
from pyspark.sql.functions import col
udfExampleDF.select(power3udf(col("num"))).show(2)

[Stage 212:>                                                        (0 + 1) / 1]

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
+-----------+
only showing top 2 rows



                                                                                

Register this UDF as a Spark SQL function.

In [194]:
from pyspark.sql.types import IntegerType, DoubleType
spark.udf.register("power3py", power3, DoubleType())



<function __main__.power3(double_value)>

In [195]:
udfExampleDF.selectExpr("power3py(num)").show(2)

+-------------+
|power3py(num)|
+-------------+
|         null|
|         null|
+-------------+
only showing top 2 rows



# Day 3 (chapter 7)

In [197]:
df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("data/retail-data/all/*.csv")\
  .coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")



23/01/30 22:17:08 WARN CacheManager: Asked to cache already cached data.


                                                                                

# 1. Aggregation Functions

# 1.1 count

In [198]:
from pyspark.sql.functions import count
df.select(count("StockCode")).show() 



+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



                                                                                

# 1.2 countDistinct

In [200]:

from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()



+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



                                                                                

In [201]:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show() 



+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



                                                                                

# 1.3 first and last

In [202]:
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



# 1.4 min and max

In [203]:
from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



# 1.5 sum

In [204]:

from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



# 1.6 sumDistinct

In [207]:

from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [208]:
from pyspark.sql.functions import sum, count, avg, expr

df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"))\
  .selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



# 1.7 Variance and Standard Deviation

In [209]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
  stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+------------------+------------------+--------------------+---------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+------------------+------------------+--------------------+---------------------+
|47559.303646609056|47559.391409298754|  218.08095663447796|   218.08115785023418|
+------------------+------------------+--------------------+---------------------+



# 1.8 skewness and kurtosis

In [210]:
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+-------------------+------------------+
| skewness(Quantity)|kurtosis(Quantity)|
+-------------------+------------------+
|-0.2640755761052562|119768.05495536952|
+-------------------+------------------+



# 1.9 Covariance and Correlation


In [211]:
from pyspark.sql.functions import corr, covar_pop, covar_samp
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
    covar_pop("InvoiceNo", "Quantity")).show()




+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085635685E-4|             1052.7280543902734|            1052.7260778741693|
+-------------------------+-------------------------------+------------------------------+



                                                                                

# 1.10 Aggregating to Complex Types

In [212]:
from pyspark.sql.functions import collect_set, collect_list
df.agg(collect_set("Country"), collect_list("Country")).show()

                                                                                

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



# 1.11 Grouping


In [213]:
from pyspark.sql.functions import count

df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show()



+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536370|  20|             20|
|   536380|   1|              1|
|   536384|  13|             13|
|   536387|   5|              5|
|   536397|   2|              2|
|   536405|   1|              1|
|   536407|   2|              2|
|   536463|   1|              1|
|   536500|  15|             15|
|   536522|  54|             54|
|   536523|  12|             12|
|   536536|   3|              3|
|   536538|  31|             31|
|   536542|  16|             16|
|   536555|   2|              2|
|   536561|  15|             15|
|   536573|   4|              4|
|   536579|   2|              2|
|   536580|   6|              6|
|   536582|  17|             17|
+---------+----+---------------+
only showing top 20 rows



                                                                                

In [214]:
df.groupBy("InvoiceNo", "CustomerId").count().show()



+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536366|     17850|    2|
|   536367|     13047|   12|
|   536369|     13047|    1|
|   536376|     15291|    2|
|   536387|     16029|    5|
|  C536391|     17548|    7|
|   536392|     13705|   10|
|   536399|     17850|    2|
|   536403|     12791|    2|
|   536405|     14045|    1|
|   536415|     12838|   59|
|   536446|     15983|   32|
|   536463|     14849|    1|
|   536464|     17968|   85|
|   536500|     17377|   15|
|   536525|     14078|   13|
|   536529|     14237|    9|
|   536531|     15485|   23|
|   536532|     12433|   73|
|   536533|     16955|    6|
+---------+----------+-----+
only showing top 20 rows



                                                                                

# 1.12 Grouping with Expressions

In [215]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
  .show()


[Stage 274:>                                                        (0 + 5) / 5]

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536370|             22.45|   8.935742834258381|
|   536380|              24.0|                 0.0|
|   536384|14.615384615384615|  15.750645708563392|
|   536387|             288.0|  117.57550765359255|
|   536397|              30.0|                18.0|
|   536405|             128.0|                 0.0|
|   536407|               6.0|                 0.0|
|   536463|              12.0|                 0.0|
|   536500|               6.8|   4.019950248448356|
|   536522|1.5925925925925926|  1.6046058535136642|
|   536523| 9.333333333333334|   7.487025815072067|
|   536536|31.666666666666668|  34.373762603991366|
|   536538| 4.709677419354839|  3.7173833008743054|
|   536542|              24.5|    8.73212459828649|
|   536555|               1.0|                 0.0|
|   536561| 9.333333333333334|  2.9814239699997196|
|   536573| 

                                                                                

In [216]:

from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

# 1.13 Grouping with Maps

In [217]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
 .show()


+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536370|             22.45|   8.935742834258381|
|   536380|              24.0|                 0.0|
|   536384|14.615384615384615|  15.750645708563392|
|   536387|             288.0|  117.57550765359255|
|   536397|              30.0|                18.0|
|   536405|             128.0|                 0.0|
|   536407|               6.0|                 0.0|
|   536463|              12.0|                 0.0|
|   536500|               6.8|   4.019950248448356|
|   536522|1.5925925925925926|  1.6046058535136642|
|   536523| 9.333333333333334|   7.487025815072067|
|   536536|31.666666666666668|  34.373762603991366|
|   536538| 4.709677419354839|  3.7173833008743054|
|   536542|              24.5|    8.73212459828649|
|   536555|               1.0|                 0.0|
|   536561| 9.333333333333334|  2.9814239699997196|
|   536573| 

# 1.14 Window Functions

In [218]:
from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")


In [219]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
  .partitionBy("CustomerId", "date")\
  .orderBy(desc("Quantity"))\
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [220]:

from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [221]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [222]:
from pyspark.sql.functions import col
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

[Stage 283:>                                                        (0 + 1) / 1]

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

                                                                                

# 1.15 Grouping Sets


In [223]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

# 1.16 Rollups

In [225]:
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
  .orderBy("Date")
rolledUpDF.show()



+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|     Australia|           107|
|2010-12-01|       Germany|           117|
|2010-12-01|        Norway|          1852|
|2010-12-01|          EIRE|           243|
|2010-12-01|        France|           449|
|2010-12-01|          null|         26814|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|   Netherlands|            97|
|2010-12-02|          null|         21023|
|2010-12-02|       Germany|           146|
|2010-12-02|United Kingdom|         20873|
|2010-12-02|          EIRE|             4|
|2010-12-03|       Belgium|           528|
|2010-12-03|   Switzerland|           110|
|2010-12-03|          EIRE|          2575|
|2010-12-03|         Spain|           400|
|2010-12-03|       Germany|           170|
|2010-12-03|        France|           239|
|2010-12-03|      Portugal|            65|
+----------

                                                                                

In [226]:
rolledUpDF.where("Country IS NULL").show()




+----------+-------+--------------+
|      Date|Country|total_quantity|
+----------+-------+--------------+
|      null|   null|       5176450|
|2010-12-01|   null|         26814|
|2010-12-02|   null|         21023|
|2010-12-03|   null|         14830|
|2010-12-05|   null|         16395|
|2010-12-06|   null|         21419|
|2010-12-07|   null|         24995|
|2010-12-08|   null|         22741|
|2010-12-09|   null|         18431|
|2010-12-10|   null|         20297|
|2010-12-12|   null|         10565|
|2010-12-13|   null|         17623|
|2010-12-14|   null|         20098|
|2010-12-15|   null|         18229|
|2010-12-16|   null|         29632|
|2010-12-17|   null|         16069|
|2010-12-19|   null|          3795|
|2010-12-20|   null|         14965|
|2010-12-21|   null|         15467|
|2010-12-22|   null|          3192|
+----------+-------+--------------+
only showing top 20 rows



                                                                                

In [227]:
rolledUpDF.where("Date IS NULL").show()



+----+-------+--------------+
|Date|Country|total_quantity|
+----+-------+--------------+
|null|   null|       5176450|
+----+-------+--------------+



                                                                                

# 1.17 Cube

In [228]:
from pyspark.sql.functions import sum

dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
  .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()



+----+---------------+-------------+
|Date|        Country|sum(Quantity)|
+----+---------------+-------------+
|null|        Finland|        10666|
|null| Czech Republic|          592|
|null|      Singapore|         5234|
|null|         Israel|         4353|
|null|        Lebanon|          386|
|null|         Norway|        19247|
|null|        Germany|       117448|
|null|Channel Islands|         9479|
|null|           EIRE|       142637|
|null|         Greece|         1556|
|null|    Switzerland|        30325|
|null|        Belgium|        23152|
|null|         Poland|         3653|
|null|        Denmark|         8188|
|null|       Portugal|        16180|
|null|      Hong Kong|         4769|
|null|      Lithuania|          652|
|null|          Italy|         7999|
|null|      Australia|        83653|
|null|          Spain|        26824|
+----+---------------+-------------+
only showing top 20 rows



                                                                                

# 1.18 Pivot

In [229]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

# Day 4 (chapter 8)

# 1.JOINS

In [265]:
summ1=spark.read.csv("data/flight-data/csv/2010-summary.csv",header=True,inferSchema=True)
summ2=spark.read.csv("data/flight-data/csv/2011-summary.csv",header=True,inferSchema=True)
summ3=spark.read.csv("data/flight-data/csv/2012-summary.csv",header=True,inferSchema=True)
summ4=spark.read.csv("data/flight-data/csv/2013-summary.csv",header=True,inferSchema=True)
summ5=spark.read.csv("data/flight-data/csv/2014-summary.csv",header=True,inferSchema=True)
summ6=spark.read.csv("data/flight-data/csv/2015-summary.csv",header=True,inferSchema=True)

In [266]:
print(summ1.columns)
print(summ2.columns)
print(summ3.columns)
print(summ4.columns)
print(summ5.columns)
print(summ6.columns)

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']


In [230]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")


In [231]:
joinExpression = person["graduate_program"] == graduateProgram['id']

In [232]:
wrongJoinExpression = person["name"] == graduateProgram["school"]

In [233]:
joinType = "inner"

In [234]:
gradProgram2 = graduateProgram.union(spark.createDataFrame([
    (0, "Masters", "Duplicated Row", "Duplicated School")]))

gradProgram2.createOrReplaceTempView("gradProgram2")



In [235]:
from pyspark.sql.functions import expr

person.withColumnRenamed("id", "personId")\
  .join(sparkStatus, expr("array_contains(spark_status, id)")).show()


+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



# Inner Join

In [237]:
person.join(graduateProgram, joinExpression).show()



+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



                                                                                

In [267]:
inner_join = summ1.join(summ2, ["DEST_COUNTRY_NAME"], "inner")
inner_join.show(5)

+-----------------+-------------------+-----+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+--------------------+-----+
|    United States|            Romania|    1|               Haiti|  197|
|    United States|            Romania|    1|       French Guiana|   11|
|    United States|            Romania|    1|Saint Kitts and N...|  120|
|    United States|            Romania|    1| Trinidad and Tobago|  213|
|    United States|            Romania|    1|             Bolivia|   51|
+-----------------+-------------------+-----+--------------------+-----+
only showing top 5 rows



# Outer Joins

In [270]:
full_outer_join = summ1.join(summ5, ["DEST_COUNTRY_NAME"], "full_outer")
full_outer_join.show(5)

+-------------------+-------------------+-----+-------------------+-----+
|  DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+-------------------+-----+
|        Afghanistan|      United States|   11|               null| null|
|            Algeria|               null| null|      United States|    9|
|             Angola|      United States|   14|      United States|   13|
|           Anguilla|      United States|   21|      United States|   34|
|Antigua and Barbuda|      United States|  123|      United States|  115|
+-------------------+-------------------+-----+-------------------+-----+
only showing top 5 rows



In [238]:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()


+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



# Left Outer Joins

In [268]:
left_outer_join = summ3.join(summ4, ["DEST_COUNTRY_NAME"], "left_outer")
left_outer_join.show(5)

+-----------------+-------------------+-----+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+--------------------+-----+
|    United States|            Croatia|    1|               Haiti|  186|
|    United States|            Croatia|    1|       French Guiana|    3|
|    United States|            Croatia|    1|Saint Kitts and N...|  115|
|    United States|            Croatia|    1|             Bolivia|   13|
|    United States|            Croatia|    1| Trinidad and Tobago|  184|
+-----------------+-------------------+-----+--------------------+-----+
only showing top 5 rows



In [239]:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()


+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



# Right Outer Joins

In [269]:
right_outer_join = summ5.join(summ6, ["DEST_COUNTRY_NAME"], "right_outer")
right_outer_join.show(5)

+-----------------+--------------------+-----+-------------------+-----+
|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|ORIGIN_COUNTRY_NAME|count|
+-----------------+--------------------+-----+-------------------+-----+
|    United States|               Haiti|  193|            Romania|   15|
|    United States|Saint Kitts and N...|  123|            Romania|   15|
|    United States|       French Guiana|    4|            Romania|   15|
|    United States|             Bolivia|   14|            Romania|   15|
|    United States| Trinidad and Tobago|  175|            Romania|   15|
+-----------------+--------------------+-----+-------------------+-----+
only showing top 5 rows



In [240]:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



# Left Semi Joins


In [241]:
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()


+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [271]:
left_semi_join = summ1.join(summ3, summ1.DEST_COUNTRY_NAME == summ3.DEST_COUNTRY_NAME, "leftsemi")
left_semi_join.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|    United States|          Singapore|   25|
+-----------------+-------------------+-----+
only showing top 5 rows



In [242]:
gradProgram2 = graduateProgram.union(spark.createDataFrame([
 (0, "Masters", "Duplicated Row", "Duplicated School")]))
gradProgram2.createOrReplaceTempView("gradProgram2")
gradProgram2.join(person, joinExpression, joinType).show()



+---+-------+--------------------+-----------------+
| id| degree|          department|           school|
+---+-------+--------------------+-----------------+
|  0|Masters|School of Informa...|      UC Berkeley|
|  1|  Ph.D.|                EECS|      UC Berkeley|
|  0|Masters|      Duplicated Row|Duplicated School|
+---+-------+--------------------+-----------------+



# Left Anti Joins

In [243]:
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()


+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



In [272]:
left_anti_join = summ2.join(summ4, summ2.DEST_COUNTRY_NAME == summ4.DEST_COUNTRY_NAME, "leftanti")
left_anti_join.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Malta|      United States|    1|
|            Yemen|      United States|    1|
|       The Gambia|      United States|    1|
|           Guinea|      United States|    5|
|          Croatia|      United States|    2|
+-----------------+-------------------+-----+
only showing top 5 rows



# Cross (Cartesian) Joins


In [273]:
cross_join = summ2.crossJoin(summ5)
cross_join.show(5)

+-----------------+-------------------+-----+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+-----------------+-------------------+-----+
|    United States|       Saint Martin|    2|    United States|       Saint Martin|    1|
|    United States|       Saint Martin|    2|    United States|            Romania|   12|
|    United States|       Saint Martin|    2|    United States|            Croatia|    2|
|    United States|       Saint Martin|    2|    United States|            Ireland|  291|
|    United States|       Saint Martin|    2|    United States|              India|   62|
+-----------------+-------------------+-----+-----------------+-------------------+-----+
only showing top 5 rows



In [244]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()


+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
| id| degree|          department|     school| id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  2|Michael Armbrust|               1|     [250, 100]|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+



In [245]:
person.crossJoin(graduateProgram).show()


+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  2|Masters|                EECS|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  2|Masters|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  0|Masters|School of Informa...|UC 

# Challenges When Using Joins

# Joins on Complex Types

In [246]:
from pyspark.sql.functions import expr
person.withColumnRenamed("id", "personId")\
 .join(sparkStatus, expr("array_contains(spark_status, id)")).show()

+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



# Handling Duplicate Column Names

1: Renaming the columns: You can use the withColumnRenamed method to rename the duplicate columns before the join. For example:

In [274]:
summ1.withColumnRenamed("DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME_1").show(5)
summ2.withColumnRenamed("DEST_COUNTRY_NAME", "DEST_COUNTRY_NAME_2").show(5)


+-------------------+-------------------+-----+
|DEST_COUNTRY_NAME_1|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|      United States|            Romania|    1|
|      United States|            Ireland|  264|
|      United States|              India|   69|
|              Egypt|      United States|   24|
|  Equatorial Guinea|      United States|    1|
+-------------------+-------------------+-----+
only showing top 5 rows

+-------------------+-------------------+-----+
|DEST_COUNTRY_NAME_2|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|      United States|       Saint Martin|    2|
|      United States|             Guinea|    2|
|      United States|            Croatia|    1|
|      United States|            Romania|    3|
|      United States|            Ireland|  268|
+-------------------+-------------------+-----+
only showing top 5 rows



2: Using the as keyword: When selecting columns, you can use the alias method or the as keyword to give a new name to the duplicate column. For example:

In [275]:
summ1.select(summ1["DEST_COUNTRY_NAME"].alias("DEST_COUNTRY_NAME_1")).show(5)
summ2.selectExpr("DEST_COUNTRY_NAME as DEST_COUNTRY_NAME_1").show(5)

+-------------------+
|DEST_COUNTRY_NAME_1|
+-------------------+
|      United States|
|      United States|
|      United States|
|              Egypt|
|  Equatorial Guinea|
+-------------------+
only showing top 5 rows

+-------------------+
|DEST_COUNTRY_NAME_1|
+-------------------+
|      United States|
|      United States|
|      United States|
|      United States|
|      United States|
+-------------------+
only showing top 5 rows



3: Using the withColumn method: You can use the withColumn method to add a new column with a new name, and then drop the original column. For example:

In [276]:
summ= summ1.withColumn("DEST_COUNTRY_NAME_1", summ1["DEST_COUNTRY_NAME"])
summ.drop("DEST_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+
|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME_1|
+-------------------+-----+-------------------+
|            Romania|    1|      United States|
|            Ireland|  264|      United States|
|              India|   69|      United States|
|      United States|   24|              Egypt|
|      United States|    1|  Equatorial Guinea|
+-------------------+-----+-------------------+
only showing top 5 rows



4: Using the select method: You can use the select method to select only the columns you need from the DataFrame, which will remove the duplicate columns. For example:

In [277]:
summ.select("DEST_COUNTRY_NAME_1","count","ORIGIN_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+
|DEST_COUNTRY_NAME_1|count|ORIGIN_COUNTRY_NAME|
+-------------------+-----+-------------------+
|      United States|    1|            Romania|
|      United States|  264|            Ireland|
|      United States|   69|              India|
|              Egypt|   24|      United States|
|  Equatorial Guinea|    1|      United States|
+-------------------+-----+-------------------+
only showing top 5 rows



5: Using the drop method: You can use the drop method to drop duplicate columns after join the dataframe. For example:

In [278]:
sum = summ.join(summ2, summ1.DEST_COUNTRY_NAME == summ2.DEST_COUNTRY_NAME, "inner")
sum.drop("DEST_COUNTRY_NAME").show(5)

+-------------------+-----+-------------------+--------------------+-----+
|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME_1| ORIGIN_COUNTRY_NAME|count|
+-------------------+-----+-------------------+--------------------+-----+
|            Romania|    1|      United States|               Haiti|  197|
|            Romania|    1|      United States|       French Guiana|   11|
|            Romania|    1|      United States|Saint Kitts and N...|  120|
|            Romania|    1|      United States| Trinidad and Tobago|  213|
|            Romania|    1|      United States|             Bolivia|   51|
+-------------------+-----+-------------------+--------------------+-----+
only showing top 5 rows



In [254]:
gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")

# Approach 1: Different join expression

When you have two keys that have the same name, probably the easiest fix is to change the join expression from a Boolean expression to a string or sequence. This automatically removes one of the columns for you during the join:

# Approach 2: Dropping the column after the join

Another approach is to drop the offending column after the join. When doing this,
we need to refer to the column via the original source DataFrame. We can do this if
the join uses the same key names or if the source DataFrames have columns that sim‐
ply have the same name:

# Approach 3: Renaming a column before the join

We can avoid this issue altogether if we rename one of our columns before the join:

# 3. How spark performs joins

In PySpark, joins are performed by the join method on a DataFrame, which takes one or more DataFrames as arguments. The basic syntax for joining two DataFrames is as follows:

In [280]:
summ1.join(summ1, summ1.DEST_COUNTRY_NAME == summ1.DEST_COUNTRY_NAME, "inner").show(5)

23/01/30 23:24:57 WARN Column: Constructing trivially true equals predicate, 'DEST_COUNTRY_NAME#12044 = DEST_COUNTRY_NAME#12044'. Perhaps you need to use aliases.
+-----------------+-------------------+-----+-----------------+--------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+-----------------+--------------------+-----+
|    United States|            Romania|    1|    United States|              Uganda|    1|
|    United States|            Romania|    1|    United States|               Haiti|  226|
|    United States|            Romania|    1|    United States|       French Guiana|    1|
|    United States|            Romania|    1|    United States|Saint Kitts and N...|  127|
|    United States|            Romania|    1|    United States|            Slovakia|    1|
+-----------------+-------------------+-----+-----------------+--------------------+-----+
only showing top 5

When the join method is called, Spark will perform the following steps:

Broadcast the smaller DataFrame: If one of the DataFrames is smaller than the other, Spark will broadcast it to all the worker nodes so that it can be used for the join.

Partition the larger DataFrame: The larger DataFrame is partitioned into smaller chunks called RDDs, which are distributed across the worker nodes.

Shuffle the data: The data is shuffled so that all the rows with the same join key are on the same worker node. This step is necessary so that the join can be performed in parallel.

Perform the join: Each worker node performs the join locally on its partition of the data. The join is performed based on the join condition specified in the join method.

Collect the results: The results from all the worker nodes are collected and combined to form the final joined DataFrame.

It's important to note that the performance of the join operation depends on the distribution of the data and the size of the DataFrames. If the data is not well-distributed, a large amount of data may need to be shuffled, which can cause performance issues. Additionally, if the DataFrames are very large, it may be more efficient to perform a broadcast or bucketed join, or to use a different join strategy such as map-side join.

# Day 5 (chapter 9)

# 1. Datasources

# 1.1. Basics of reading data

The most commonly used method for reading data in PySpark is the read method of the SparkSession object.

Here is an example of how to read a CSV file:

In [281]:
#sum1 = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

In this example, we first create a SparkSession object, and then use the read.csv method to read the CSV file located at the specified path. The header parameter is set to True so that the first row of the CSV file is used as the header, and the inferSchema parameter is set to True so that PySpark can infer the data types of the columns.

You can also read data from other file formats like json, parquet etc by using spark.read.json(), spark.read.parquet() respectively.

# 2. Basics of write data

The most commonly used method for writing data in PySpark is the write method of the DataFrame object.

Here is an example of how to write a DataFrame to a CSV file:

In [282]:
#df.write.csv("path/to/new_file.csv", header=True)

In this example, we first create a SparkSession object, and then use the read.csv method to read the CSV file located at the specified path. Then we use the write.csv method to write the DataFrame to a new CSV file located at the specified path. The header parameter is set to True so that the column names will be written as the first row of the new CSV file.

You can also write data to other file formats like json, parquet etc by using df.write.json(), df.write.parquet() respectively. Additionally you can also write data to various databases like hive, mysql etc using df.write.jdbc() method.

# 3. CSV files - reading, writing

In [283]:
csvFile = spark.read.format("csv")\
  .option("header", "true")\
  .option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .load("data/flight-data/csv/2010-summary.csv")

In [284]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
  .save("/tmp/my-tsv-file.tsv")

# 4. JSON files - reading, writing

In [285]:
spark.read.format("json").option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .load("data/flight-data/json/2010-summary.json").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [286]:
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")


# 5. Parquet files 

Parquet is a columnar storage format that is widely used in the Apache Hadoop ecosystem and is supported by many big data processing frameworks, including PySpark. There are several reasons why Parquet is important in PySpark:

Efficiency: Parquet stores data in a columnar format, which means that only the required columns are read and processed, rather than reading and processing the entire row. This leads to significant performance improvements when working with large datasets.

Compression: Parquet supports various compression algorithms, such as Snappy and Gzip, which can greatly reduce the storage space required for large datasets.

Schema evolution: Parquet supports schema evolution, which means that a dataset's schema can be changed over time without having to rewrite the entire dataset. This is particularly useful when working with data that is constantly changing or evolving.

Predicate pushdown: Parquet supports predicate pushdown, which means that filtering conditions can be pushed down to the storage layer, rather than being applied in the query layer. This leads to further performance improvements when working with large datasets.

Interoperability: Parquet is an open standard, which means that it can be used with a wide variety of data processing frameworks, including PySpark, Hive, Pig, and Impala.

# 6. PARQUET files - reading, writing

To read and write parquet files in PySpark you can use the read.parquet() and write.parquet() methods respectively. Here is an example:

In [291]:
spark.read.format("parquet")\
  .load("data/flight-data/parquet/2010-summary.parquet").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [292]:
csvFile.write.format("parquet").mode("overwrite")\
  .save("/tmp/my-parquet-file.parquet")

# 7. ORC

ORC (Optimized Row Columnar) is a file format that is similar to Parquet and is also widely used in the Apache Hadoop ecosystem. Like Parquet, ORC is a columnar storage format that is designed to improve the performance and storage efficiency of big data processing frameworks, such as PySpark.

Here are some of the key benefits of using ORC in PySpark:

Performance: ORC stores data in a columnar format, which leads to significant performance improvements when working with large datasets.

Compression: ORC supports various compression algorithms, such as Snappy, Zlib, and LZO, which can greatly reduce the storage space required for large datasets.

Schema evolution: ORC supports schema evolution, which means that a dataset's schema can be changed over time without having to rewrite the entire dataset.

Predicate pushdown: ORC supports predicate pushdown, which means that filtering conditions can be pushed down to the storage layer, rather than being applied in the query layer.

Interoperability: ORC is an open standard, which means that it can be used with a wide variety of data processing frameworks, including PySpark, Hive, Pig, and Impala.

# 8.ORC files - reading, writing

In [293]:
spark.read.format("orc").load("data/flight-data/orc/2010-summary.orc").show(5)


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [295]:
csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")


# 9. Reading from SQL Databases

In [301]:
driver = "org.sqlite.JDBC"
path = "data/flight-data/jdbc/my-sqlite.db"
url = "jdbc:sqlite:" + path
tablename = "flight_info"


In [302]:
dbDataFrame = spark.read.format("jdbc").option("url", url)\
  .option("dbtable", tablename).option("driver",  driver).load()

# 10. Query Pushdown

Spark makes a best-effort attempt to filter data in the database itself before cre‐
ating the DataFrame.

In [303]:
dbDataFrame.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain()

== Physical Plan ==
*(1) Scan JDBCRelation(flight_info) [numPartitions=1] [DEST_COUNTRY_NAME#12763,ORIGIN_COUNTRY_NAME#12764,count#12765] PushedFilters: [*In(DEST_COUNTRY_NAME, [Anguilla,Sweden])], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:decimal(20,0)>




In [304]:
pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info)
  AS flight_info"""
dbDataFrame = spark.read.format("jdbc")\
  .option("url", url).option("dbtable", pushdownQuery).option("driver",  driver)\
  .load()

# 11. Reading from databases in parallel

In [305]:
dbDataFrame = spark.read.format("jdbc")\
  .option("url", url).option("dbtable", tablename).option("driver",  driver)\
  .option("numPartitions", 10).load()


In [306]:
props = {"driver":"org.sqlite.JDBC"}
predicates = [
  "DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
  "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).show()
spark.read.jdbc(url,tablename,predicates=predicates,properties=props)\
  .rdd.getNumPartitions()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Sweden|      United States|   65|
|    United States|             Sweden|   73|
|         Anguilla|      United States|   21|
|    United States|           Anguilla|   20|
+-----------------+-------------------+-----+



2

In [307]:
props = {"driver":"org.sqlite.JDBC"}
predicates = [
  "DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
  "DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).count()



510

# 12.Partitioning based on a sliding window

In [308]:
colName = "count"
lowerBound = 0
upperBound = 348113 # this is the max count in our database
numPartitions = 10

In [309]:

spark.read.jdbc(url, tablename, column=colName, properties=props,
                lowerBound=lowerBound, upperBound=upperBound,
                numPartitions=numPartitions).count() 

255

# 13.Writing to SQL Databases

In [310]:
newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.jdbc(newPath, tablename, mode="overwrite", properties=props)

23/01/31 10:49:41 WARN JdbcUtils: Requested isolation level 1 is not supported; falling back to default isolation level 8


In [311]:
spark.read.jdbc(newPath, tablename, properties=props).count()

255

In [312]:
csvFile.write.jdbc(newPath, tablename, mode="append", properties=props)

23/01/31 10:49:53 WARN JdbcUtils: Requested isolation level 1 is not supported; falling back to default isolation level 8


In [313]:
spark.read.jdbc(newPath, tablename, properties=props).count()

510

# 14.Partitioning


Partitioning is a tool that allows you to control what data is stored (and where) as you
write it. When you write a file to a partitioned directory (or table), you basically
encode a column as a folder. What this allows you to do is skip lots of data when you
go to read it in later, allowing you to read in only the data relevant to your problem
instead of having to scan the complete dataset. These are supported for all file-based
data sources:


In [317]:
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")\
 .save("/tmp/partitioned-files.parquet")


                                                                                

# 15.Bucketing

Bucketing is another file organization approach with which you can control the data
that is specifically written to each file. This can help avoid shuffles later when you go
to read the data because data with the same bucket ID will all be grouped together
into one physical partition.This means that the data is prepartitioned according to how you expect to use that data later on, meaning you can avoid expensive shuffles
when joining or aggregating.
Rather than partitioning on a specific column (which might write out a ton of direc‐
tories), it’s probably worthwhile to explore bucketing the data instead. This will create
a certain number of files and organize our data into those “buckets”:

In [319]:
numberBuckets = 10
columnToBucketBy = "count"
csvFile.write.format("parquet").mode("overwrite")\
 .bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

                                                                                

# SPARK SQL (chapter 10)

# Importing

In [325]:
spark.read.json("data/flight-data/json/2015-summary.json")\
  .createOrReplaceTempView("summary") # DF => SQL

spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
""")\
  .where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10")\
  .count()

12

In [326]:
spark.sql('SELECT * FROM summary').show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

1.What is the total number of flights?


In [327]:
spark.sql('SELECT COUNT(*) AS ROWS_count FROM summary').show()

+----------+
|ROWS_count|
+----------+
|       256|
+----------+



In [328]:
2.What are the top 10 destination countries by count?

In [329]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 10\
          ").show()

+------------------+-----------+
| DEST_COUNTRY_NAME|total_count|
+------------------+-----------+
|     United States|     411352|
|            Canada|       8399|
|            Mexico|       7140|
|    United Kingdom|       2025|
|             Japan|       1548|
|           Germany|       1468|
|Dominican Republic|       1353|
|       South Korea|       1048|
|       The Bahamas|        955|
|            France|        935|
+------------------+-----------+



3.How many flights originated from the United States?

In [331]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE ORIGIN_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|    411966|
+----------+



4.What are the top 5 origin countries for flights to Japan?

In [332]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          WHERE DEST_COUNTRY_NAME = 'Japan'\
          GROUP BY ORIGIN_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 5\
          ").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|       1548|
+-------------------+-----------+



5.What is the total number of flights to the United States?

In [333]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE DEST_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|    411352|
+----------+



6.What is the total number of flights from the United States?

In [334]:
spark.sql("SELECT SUM(count)\
          FROM summary\
          WHERE ORIGIN_COUNTRY_NAME = 'United States'\
          ").show()

+----------+
|sum(count)|
+----------+
|    411966|
+----------+



7.What are the top 10 origin and destination pairs by count?

In [335]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
          LIMIT 10\
").show()

+-------------------+-----------------+-----------+
|ORIGIN_COUNTRY_NAME|DEST_COUNTRY_NAME|total_count|
+-------------------+-----------------+-----------+
|      United States|    United States|     370002|
|             Canada|    United States|       8483|
|      United States|           Canada|       8399|
|             Mexico|    United States|       7187|
|      United States|           Mexico|       7140|
|      United States|   United Kingdom|       2025|
|     United Kingdom|    United States|       1970|
|      United States|            Japan|       1548|
|              Japan|    United States|       1496|
|      United States|          Germany|       1468|
+-------------------+-----------------+-----------+



8.How many flights originated from each country?

In [336]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY ORIGIN_COUNTRY_NAME\
          ORDER BY total_count DESC\
").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|     411966|
|             Canada|       8483|
|             Mexico|       7187|
|     United Kingdom|       1970|
|              Japan|       1496|
| Dominican Republic|       1420|
|            Germany|       1336|
|        The Bahamas|        986|
|             France|        952|
|              China|        920|
|           Colombia|        867|
|        South Korea|        827|
|            Jamaica|        712|
|        Netherlands|        660|
|             Brazil|        619|
|         Costa Rica|        608|
|        El Salvador|        508|
|               Cuba|        478|
|             Panama|        465|
|              Spain|        442|
+-------------------+-----------+
only showing top 20 rows



9.How many flights went to each country?

In [337]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
          FROM summary\
          GROUP BY DEST_COUNTRY_NAME\
          ORDER BY total_count DESC\
").show()

+------------------+-----------+
| DEST_COUNTRY_NAME|total_count|
+------------------+-----------+
|     United States|     411352|
|            Canada|       8399|
|            Mexico|       7140|
|    United Kingdom|       2025|
|             Japan|       1548|
|           Germany|       1468|
|Dominican Republic|       1353|
|       South Korea|       1048|
|       The Bahamas|        955|
|            France|        935|
|          Colombia|        873|
|            Brazil|        853|
|       Netherlands|        776|
|             China|        772|
|           Jamaica|        666|
|        Costa Rica|        588|
|       El Salvador|        561|
|            Panama|        510|
|              Cuba|        466|
|             Spain|        420|
+------------------+-----------+
only showing top 20 rows



10.What is the total number of flights between the United States and Canada?

In [338]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE (ORIGIN_COUNTRY_NAME = 'United States' AND DEST_COUNTRY_NAME = 'Canada') OR (ORIGIN_COUNTRY_NAME = 'Canada' AND DEST_COUNTRY_NAME = 'United States')\
").show()


+----------+
|sum(count)|
+----------+
|     16882|
+----------+



11.What are the 5 most common origin countries for flights to the United Kingdom?

In [339]:
spark.sql("SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total_count\
  FROM summary\
  WHERE DEST_COUNTRY_NAME = 'United Kingdom'\
  GROUP BY ORIGIN_COUNTRY_NAME\
  ORDER BY total_count DESC\
  LIMIT 5\
").show()

+-------------------+-----------+
|ORIGIN_COUNTRY_NAME|total_count|
+-------------------+-----------+
|      United States|       2025|
+-------------------+-----------+



12.What are the top 10 destination countries for flights from China?

In [340]:
spark.sql("\
  SELECT DEST_COUNTRY_NAME, SUM(count) as total_count\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'China'\
  GROUP BY DEST_COUNTRY_NAME\
  ORDER BY total_count DESC\
  LIMIT 10\
").show()

+-----------------+-----------+
|DEST_COUNTRY_NAME|total_count|
+-----------------+-----------+
|    United States|        920|
+-----------------+-----------+



13. What is the total number of flights between United States and New Zealand?


In [341]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE (ORIGIN_COUNTRY_NAME = 'United States' AND DEST_COUNTRY_NAME = 'New Zealand') OR (ORIGIN_COUNTRY_NAME = 'New Zealand' AND DEST_COUNTRY_NAME = 'United States')\
").show()


+----------+
|sum(count)|
+----------+
|       185|
+----------+



14. What is the total number of flights from India?

In [342]:
spark.sql("SELECT SUM(count)\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'India'\
").show()

+----------+
|sum(count)|
+----------+
|        62|
+----------+



15. What is the rank of the destination country with the most flights?

In [344]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  RANK() OVER (ORDER BY SUM(count) DESC) as rank\
  FROM summary\
  GROUP BY DEST_COUNTRY_NAME\
").show()

23/01/31 11:13:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:13:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:13:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:13:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:13:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:13:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---------

16. What is the rank of the destination country with the most flights from France?

In [345]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  RANK() OVER (ORDER BY SUM(count) DESC) as rank\
  FROM summary\
  WHERE ORIGIN_COUNTRY_NAME = 'France'\
  GROUP BY DEST_COUNTRY_NAME\
").show()

23/01/31 11:14:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---------

17. What is the cumulative sum of flights to each destination country, ordered by the number of flights?

In [346]:
spark.sql("SELECT DEST_COUNTRY_NAME, SUM(count) as total_count,\
  SUM(SUM(count)) OVER (ORDER BY SUM(count) DESC) as cumulative_sum\
  FROM summary\
  GROUP BY DEST_COUNTRY_NAME\
  ORDER BY total_count DESC\
").show()

23/01/31 11:14:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 11:14:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/01/31 1

# How Spark Runs on a Cluster (chapter 15)

# SparkSession


In [347]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

23/01/31 11:17:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# Logical instructions to physical execution

In [348]:

df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")

step4.collect() 
step4.explain()



== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(7) HashAggregate(keys=[], functions=[sum(id#13405L)])
   +- ShuffleQueryStage 4
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#8229]
         +- *(6) HashAggregate(keys=[], functions=[partial_sum(id#13405L)])
            +- *(6) Project [id#13405L]
               +- *(6) SortMergeJoin [id#13405L], [id#13399L], Inner
                  :- *(4) Sort [id#13405L ASC NULLS FIRST], false, 0
                  :  +- ShuffleQueryStage 2
                  :     +- Exchange hashpartitioning(id#13405L, 5), ENSURE_REQUIREMENTS, [id=#8108]
                  :        +- *(3) Project [(id#13397L * 5) AS id#13405L]
                  :           +- ShuffleQueryStage 0
                  :              +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [id=#8041]
                  :                 +- *(1) Range (2, 10000000, step=2, splits=8)
                  +- *(5) Sort [id#13399L ASC NULLS FIRST], fal

                                                                                

# Spark UI (chapter 18)

When we run Spark in local mode, for example,
just navigate to http://localhost:4040 to see the UI when running a Spark Application
on local machine. 

• The Jobs tab refers to Spark jobs.

• The Stages tab pertains to individual stages (and their relevant tasks).

• The Storage tab includes information and the data that is currently cached in our Spark Application.

• The Environment tab contains relevant information about the configurations and current settings of the Spark application.

• The SQL tab refers to our Structured API queries (including SQL and Data‐Frames).

• The Executors tab provides detailed information about each executor running our application.

In [350]:
spark.read\
 .option("header", "true")\
 .csv("data/retail-data/all/online-retail-dataset.csv")\
 .repartition(2)\
 .selectExpr("instr(Description, 'GLASS') >= 1 as is_glass")\
 .groupBy("is_glass")\
 .count()\
 .collect()

                                                                                

[Row(is_glass=False, count=527594),
 Row(is_glass=None, count=1454),
 Row(is_glass=True, count=12861)]

# Performance Tuning (chapter 19)

# Temporary Data Storage (Caching)

In [351]:
DF1 = spark.read.format("csv")\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .load("data/flight-data/csv/2015-summary.csv")
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().collect()

In [352]:

DF1.cache()
DF1.count()

256

In [353]:
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().collect()