# The Spark DataFrame

Spark applications consist of **Driver Process** and **Executor process**

Driver process: 
- Is responsible for distributing, analysing and scheduling work across the executors
- Is responsible for maintaining information about the Spark Application

Executor process:
- Carries the actual work that is assigned to them by the driver process

In [1]:
from pyspark.sql import SparkSession

The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


In [2]:
spark =  SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/24 00:17:08 WARN Utils: Your hostname, billy-Yoga-9-15IMH5, resolves to a loopback address: 127.0.1.1; using 10.107.122.146 instead (on interface wlp0s20f3)
26/02/24 00:17:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/24 00:17:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

### DataFrames

 A PySpark DataFrame can be created via: 
- createDataFrame method passing a list of lists, tuples, dictionaries, pyspark.sql.Row
- From a pandas dataframe
- an RDD
- Reading a file, database (JDBC Driver)

In [16]:
from pyspark.sql import Row
import pandas as pd
import os
from datetime import datetime, date

In [5]:
df = spark.createDataFrame([
    Row(a=1, b=0.45, c='Homer', d=datetime.today(), e=datetime.now()),
    Row(a=1, b=0.89, c='Marcus Aurellius', d=datetime.today(), e=datetime.now()),
    Row(a=1, b=0.78, c='Ceasar', d=datetime.today(), e=datetime.now())
])
df

DataFrame[a: bigint, b: double, c: string, d: timestamp, e: timestamp]

In [6]:
df.show()

                                                                                

+---+----+----------------+--------------------+--------------------+
|  a|   b|               c|                   d|                   e|
+---+----+----------------+--------------------+--------------------+
|  1|0.45|           Homer|2026-02-19 22:27:...|2026-02-19 22:27:...|
|  1|0.89|Marcus Aurellius|2026-02-19 22:27:...|2026-02-19 22:27:...|
|  1|0.78|          Ceasar|2026-02-19 22:27:...|2026-02-19 22:27:...|
+---+----+----------------+--------------------+--------------------+



In [7]:
df.describe()

DataFrame[summary: string, a: string, b: string, c: string]

In [8]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: timestamp (nullable = true)
 |-- e: timestamp (nullable = true)



In [10]:
df.select("d").show()

+--------------------+
|                   d|
+--------------------+
|2026-02-19 22:27:...|
|2026-02-19 22:27:...|
|2026-02-19 22:27:...|
+--------------------+



##### We can also register the DataFrame as a SQL temporary view

In [12]:
df.createOrReplaceTempView("df_table")

In [13]:
sqldf = spark.sql("SELECT * FROM df_table")
sqldf.show()

+---+----+----------------+--------------------+--------------------+
|  a|   b|               c|                   d|                   e|
+---+----+----------------+--------------------+--------------------+
|  1|0.45|           Homer|2026-02-19 22:27:...|2026-02-19 22:27:...|
|  1|0.89|Marcus Aurellius|2026-02-19 22:27:...|2026-02-19 22:27:...|
|  1|0.78|          Ceasar|2026-02-19 22:27:...|2026-02-19 22:27:...|
+---+----+----------------+--------------------+--------------------+



In [15]:
spark.sql("SELECT * FROM df_table WHERE b > 0.5").show()

+---+----+----------------+--------------------+--------------------+
|  a|   b|               c|                   d|                   e|
+---+----+----------------+--------------------+--------------------+
|  1|0.89|Marcus Aurellius|2026-02-19 22:27:...|2026-02-19 22:27:...|
|  1|0.78|          Ceasar|2026-02-19 22:27:...|2026-02-19 22:27:...|
+---+----+----------------+--------------------+--------------------+



## Read In Data

In [20]:
flight_data = spark.read.option("inferSchema","true").option("header","true").csv("data/2015-summary.csv")

In [19]:
os.getcwd()

'/home/billy/Documents/Spark_concepts'

In [21]:
flight_data.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [22]:
flight_data.show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows


In [23]:
flight_data.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

#### Reading, sorting and collecting a dataframe

In [24]:
flight_data.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#147 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#147 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=100]
      +- FileScan csv [DEST_COUNTRY_NAME#145,ORIGIN_COUNTRY_NAME#146,count#147] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/billy/Documents/Spark_concepts/data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




- Nothing happens to the data when we call sort because its just a transformation.
- Sort does not modify the DataFrame, sort is a transformation that returns a new DataFrame by transforming the previous DataFrame

In [27]:
spark.conf.set("spark.sql.shuffle.partitions","10")

In [28]:
flight_data.sort("count").take(5)

[Row(DEST_COUNTRY_NAME='Malta', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Saint Vincent and the Grenadines', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Gibraltar', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

In [29]:
flight_data.createOrReplaceTempView("flight_data_table")

In [30]:
sqlWay = spark.sql("""SELECT DEST_COUNTRY_NAME, 
                    count(1) FROM flight_data_table 
                    GROUP BY DEST_COUNTRY_NAME""")

dfWay = flight_data.groupBy("DEST_COUNTRY_NAME").count()

In [31]:
sqlWay.explain()
dfWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#145], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#145, 10), ENSURE_REQUIREMENTS, [plan_id=128]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#145], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#145] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/billy/Documents/Spark_concepts/data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#145], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#145, 10), ENSURE_REQUIREMENTS, [plan_id=141]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#145], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#145] Batched: false, DataFilters: [], Format: CSV, Location: InM

In [33]:
spark.sql(""" SELECT max(count) from flight_data_table""").take(1)

[Row(max(count)=370002)]

In [34]:
from pyspark.sql.functions import max

In [36]:
flight_data.select(max("count")).take(1)

[Row(max(count)=370002)]

In [40]:
maxSql = spark.sql("""SELECT 
            DEST_COUNTRY_NAME, 
            SUM(count) AS destination_total FROM flight_data_table  
            GROUP BY DEST_COUNTRY_NAME 
            ORDER BY destination_total DESC LIMIT 5""")

In [41]:
maxSql.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#202L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#145,destination_total#202L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#145], functions=[sum(count#147)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#145, 10), ENSURE_REQUIREMENTS, [plan_id=223]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#145], functions=[partial_sum(count#147)])
            +- FileScan csv [DEST_COUNTRY_NAME#145,count#147] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/billy/Documents/Spark_concepts/data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [42]:
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

