### Lazy Evaluation Demonstration

This output means that the RDD has been defined but the actual data has not been computed yet because no action

In [1]:
from pyspark import SparkContext
sc = SparkContext('local', 'Lazy on Pyspark')

# Input RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
# Keep only even numbers
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd) # Lazy: PythonRDD[?] at RDD at PythonRDD.scala:?

PythonRDD[1] at RDD at PythonRDD.scala:53


In [2]:
# Stop the SparkContext
sc.stop()

### Spark DataFrame

A distributed and immutable collection of data organized into named columns. It is similar to a table in a relational database or a Pandas DataFrame in Python.

Spark DataFrames are built on top of RDDs (Resilient Distributed Datasets) and provide high-level APIs for structured data processing.

In [3]:
# Import SparkSession
from pyspark.sql import SparkSession
# Start SparkSession - Combines all functionalities of SparkContext, SQLContext, and HiveContext into a single API. Also, Supports RDD, DataFrame, and Dataset APIs.
# Initialize SparkSession
spark = SparkSession.builder.appName("DataFrame Basics").getOrCreate()

#### Read .json file

In [4]:
df = spark.read.json('Datasets/people.json')
df.show()

+--------+----+--------+
|Hometown| age|    name|
+--------+----+--------+
|    NULL|NULL|  Karina|
|    NULL|  23| Giselle|
|    NULL|  22|  Winter|
|  Harbin|NULL|Ningning|
+--------+----+--------+



#### Show df’s schema

In [5]:
df.printSchema()

root
 |-- Hometown: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



#### Show columns

In [6]:
df.columns # columns is an attribute so no need () ['Hometown', 'age', 'name']

['Hometown', 'age', 'name']

#### Statistical summary of DataFrame

In [7]:
df.describe() # DataFrame[summary: string, Hometown: string, age: string, name: string]
df.describe().show()

+-------+--------+------------------+-------+
|summary|Hometown|               age|   name|
+-------+--------+------------------+-------+
|  count|       1|                 2|      4|
|   mean|    NULL|              22.5|   NULL|
| stddev|    NULL|0.7071067811865476|   NULL|
|    min|  Harbin|                22|Giselle|
|    max|  Harbin|                23| Winter|
+-------+--------+------------------+-------+



#### Define custom schemas for DataFrames

In [8]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType
data_schema = [	StructField('age',IntegerType(),True),
              		StructField('name',StringType(),True),
              		StructField('Hometown',StringType(),True)]

In [10]:
final_struc = StructType(fields=data_schema)

df = spark.read.json('Datasets/people.json', schema = final_struc)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- Hometown: string (nullable = true)



#### Select a column in PySpark

In [11]:
df['name'] # Column<'name'>

Column<'name'>

In [12]:
type(df['name']) # Object

In [13]:
df.select('name') # DataFrame[name: string] #Lazy

df.select('name').show() # Trigger execution with actions

type(df.select('name')) # DataFrame

+--------+
|    name|
+--------+
|  Karina|
| Giselle|
|  Winter|
|Ningning|
+--------+



#### head(n) to select the first n rows

In [14]:
df.head(2) # List

[Row(age=None, name='Karina', Hometown=None),
 Row(age=23, name='Giselle', Hometown=None)]

In [15]:
# You can access a list element by its index and return it as a Row object.

df.head(2)[0] # pyspark.sql.types.Row

Row(age=None, name='Karina', Hometown=None)

#### Select multiple columns

In [16]:
df.select(['age','name']) # DataFrame[age: int, name: string]

DataFrame[age: int, name: string]

In [17]:
df.select(['age','name']).show()

+----+--------+
| age|    name|
+----+--------+
|NULL|  Karina|
|  23| Giselle|
|  22|  Winter|
|NULL|Ningning|
+----+--------+



#### Create a new column using withColumn()

In [18]:
df.withColumn('double_age', df['age']*2).show() # The change is temporarily !

+----+--------+--------+----------+
| age|    name|Hometown|double_age|
+----+--------+--------+----------+
|NULL|  Karina|    NULL|      NULL|
|  23| Giselle|    NULL|        46|
|  22|  Winter|    NULL|        44|
|NULL|Ningning|  Harbin|      NULL|
+----+--------+--------+----------+



In [19]:
# withColumn() is not an in-place operation; we need to save the result to a new variable.

df.withColumn('double_age', df['age']*2).show() # The change is temporarily !
df.show()

+----+--------+--------+----------+
| age|    name|Hometown|double_age|
+----+--------+--------+----------+
|NULL|  Karina|    NULL|      NULL|
|  23| Giselle|    NULL|        46|
|  22|  Winter|    NULL|        44|
|NULL|Ningning|  Harbin|      NULL|
+----+--------+--------+----------+

+----+--------+--------+
| age|    name|Hometown|
+----+--------+--------+
|NULL|  Karina|    NULL|
|  23| Giselle|    NULL|
|  22|  Winter|    NULL|
|NULL|Ningning|  Harbin|
+----+--------+--------+



In [20]:
df_new = df.withColumn('double_age', df['age']*2) # New variable df_new
df_new.show()

+----+--------+--------+----------+
| age|    name|Hometown|double_age|
+----+--------+--------+----------+
|NULL|  Karina|    NULL|      NULL|
|  23| Giselle|    NULL|        46|
|  22|  Winter|    NULL|        44|
|NULL|Ningning|  Harbin|      NULL|
+----+--------+--------+----------+



#### Rename a column

In [21]:
df.withColumnRenamed('age','my_new_age').show() #Action

+----------+--------+--------+
|my_new_age|    name|Hometown|
+----------+--------+--------+
|      NULL|  Karina|    NULL|
|        23| Giselle|    NULL|
|        22|  Winter|    NULL|
|      NULL|Ningning|  Harbin|
+----------+--------+--------+



### Spark SQL

- supports standard SQL syntax
- Can process structured data like CSV, JSON, Parquet, ORC, and Avro.

#### Using Spark SQL

In [22]:
df.createOrReplaceTempView("people")

Select all rows

In [23]:
# Query 1: Select All Rows python
spark.sql("SELECT * FROM people").show()

+----+--------+--------+
| age|    name|Hometown|
+----+--------+--------+
|NULL|  Karina|    NULL|
|  23| Giselle|    NULL|
|  22|  Winter|    NULL|
|NULL|Ningning|  Harbin|
+----+--------+--------+



Filter rows

In [24]:
# Query 2: Filter Rows with Non-NULL age
spark.sql("SELECT * FROM people WHERE age IS NOT NULL").show()

+---+-------+--------+
|age|   name|Hometown|
+---+-------+--------+
| 23|Giselle|    NULL|
| 22| Winter|    NULL|
+---+-------+--------+



Select based on a condition

In [25]:
results = spark.sql("SELECT * FROM people WHERE age = 22")
results.show()

+---+------+--------+
|age|  name|Hometown|
+---+------+--------+
| 22|Winter|    NULL|
+---+------+--------+



Close the SparkSession

In [26]:
spark.stop()

#### Basic Operations

Read CSV file

In [27]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basic Operations").getOrCreate()

df = spark.read.csv('Datasets/appl_stock.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



Show the data

In [28]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

Count the number of rows

In [29]:
df.count() # 1762

1762

Display first 3 rows

In [30]:
df.head(3) #list

[Row(Date=datetime.date(2010, 1, 4), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.date(2010, 1, 5), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.date(2010, 1, 6), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004)]

Filter the data

In [31]:
df.filter("Close > 500").show() # SQL Syntax

+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|      Date|              Open|              High|               Low|             Close|   Volume|        Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|2012-02-13|        499.529991|503.83000899999996|497.08998899999995|502.60002099999997|129304000|        65.116633|
|2012-02-14|        504.659988|         509.56002|        502.000008|        509.459991|115099600|        66.005408|
|2012-02-16|        491.500008|        504.890007|         486.62999|502.20999900000004|236138000|        65.066102|
|2012-02-17|        503.109993|507.77002000000005|        500.299995|         502.12001|133951300|        65.054443|
|2012-02-21|506.88001299999996|        514.850021|504.12000300000005|        514.850021|151398800|        66.703738|
|2012-02-22|        513.079994|        515.489983|509.0700230000

Spark SQL API

In [32]:
df.createOrReplaceTempView("apple")

spark.sql("select * from apple where Close > 500").show()

+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|      Date|              Open|              High|               Low|             Close|   Volume|        Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|2012-02-13|        499.529991|503.83000899999996|497.08998899999995|502.60002099999997|129304000|        65.116633|
|2012-02-14|        504.659988|         509.56002|        502.000008|        509.459991|115099600|        66.005408|
|2012-02-16|        491.500008|        504.890007|         486.62999|502.20999900000004|236138000|        65.066102|
|2012-02-17|        503.109993|507.77002000000005|        500.299995|         502.12001|133951300|        65.054443|
|2012-02-21|506.88001299999996|        514.850021|504.12000300000005|        514.850021|151398800|        66.703738|
|2012-02-22|        513.079994|        515.489983|509.0700230000

Select only one column based on filtered

In [33]:
df.filter("Close > 500").select('close').show()

+------------------+
|             close|
+------------------+
|502.60002099999997|
|        509.459991|
|502.20999900000004|
|         502.12001|
|        514.850021|
|        513.039993|
| 516.3899769999999|
| 522.4099809999999|
|        525.760017|
|        535.410011|
|        542.440025|
| 544.4699780000001|
|        545.180008|
| 533.1600269999999|
|        530.259987|
| 530.6900099999999|
|        541.989975|
|        545.170021|
|        551.999977|
|        568.099998|
+------------------+
only showing top 20 rows



Select multiple columns based on filtered

In [34]:
df.filter("Close > 500").select(['open','close']).show()

+------------------+------------------+
|              open|             close|
+------------------+------------------+
|        499.529991|502.60002099999997|
|        504.659988|        509.459991|
|        491.500008|502.20999900000004|
|        503.109993|         502.12001|
|506.88001299999996|        514.850021|
|        513.079994|        513.039993|
|        515.079987| 516.3899769999999|
| 519.6699980000001| 522.4099809999999|
|        521.309982|        525.760017|
|        527.960014|        535.410011|
| 541.5600049999999|        542.440025|
|        548.169983| 544.4699780000001|
|        544.240013|        545.180008|
|        545.420013| 533.1600269999999|
|        523.659996|        530.259987|
| 536.8000030000001| 530.6900099999999|
| 534.6899950000001|        541.989975|
|        544.209999|        545.170021|
| 548.9799879999999|        551.999977|
|        557.540024|        568.099998|
+------------------+------------------+
only showing top 20 rows



Using PySpark Syntax

In [35]:
# df.filter("Close > 500").show() # SQL Syntax

df.filter(df['Close'] > 500).show() #PySpark Syntax

+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|      Date|              Open|              High|               Low|             Close|   Volume|        Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+-----------------+
|2012-02-13|        499.529991|503.83000899999996|497.08998899999995|502.60002099999997|129304000|        65.116633|
|2012-02-14|        504.659988|         509.56002|        502.000008|        509.459991|115099600|        66.005408|
|2012-02-16|        491.500008|        504.890007|         486.62999|502.20999900000004|236138000|        65.066102|
|2012-02-17|        503.109993|507.77002000000005|        500.299995|         502.12001|133951300|        65.054443|
|2012-02-21|506.88001299999996|        514.850021|504.12000300000005|        514.850021|151398800|        66.703738|
|2012-02-22|        513.079994|        515.489983|509.0700230000

Select based on multiple conditions

In [36]:
# df.filter("Close < 200 and Open > 200 ").show() #SQL Syntax
# df.filter(df['close'] < 200 & df['open'] > 200).show() # Py4JError
# Select the Date where the stock opened at more than 200 and closed at less than 200.
df.filter((df['close'] < 200) & (df['open'] > 200)).show()

+----------+------------------+----------+----------+----------+---------+------------------+
|      Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+----------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+----------+------------------+----------+----------+----------+---------+------------------+



You can use the ~ (tilde symbol) to represent NOT in conditions.

In [38]:
# Select the Date where the stock opened at no more than 200 and closed at less than 200.
df.filter((df['close'] < 200) & ~(df['open'] > 200)).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-02-01|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.375532999999997|
|2010-02-03|        195.169994|        200.200003|        194.420004|        199.229994|153832000|25.812148999999998|
|2010-02-04|        196.730003|        198.370001|        191.570005|        192.050003|189413000|         24.881912|
|2010-02-05|192.63000300000002|             196.0|        190.850002|        195.460001|212576700|25.323710000000002|
|2010-02-08|        195.690006|197.88000300000002|      

**Using collect**

the collect() method is used to retrieve all the rows of a DataFrame as a list of Row objects.

In [39]:
df.filter(df['high']== 196.0).show()

+----------+------------------+-----+------------------+----------+---------+------------------+
|      Date|              Open| High|               Low|     Close|   Volume|         Adj Close|
+----------+------------------+-----+------------------+----------+---------+------------------+
|2010-02-01|192.36999699999998|196.0|191.29999899999999|194.729998|187469100|         25.229131|
|2010-02-05|192.63000300000002|196.0|        190.850002|195.460001|212576700|25.323710000000002|
+----------+------------------+-----+------------------+----------+---------+------------------+



In [40]:
df.filter(df['high']== 196.0).collect()

[Row(Date=datetime.date(2010, 2, 1), Open=192.36999699999998, High=196.0, Low=191.29999899999999, Close=194.729998, Volume=187469100, Adj Close=25.229131),
 Row(Date=datetime.date(2010, 2, 5), Open=192.63000300000002, High=196.0, Low=190.850002, Close=195.460001, Volume=212576700, Adj Close=25.323710000000002)]

Access Row object using an index.

In [41]:
results = df.filter(df['high']== 196.0).collect()
results[1]

Row(Date=datetime.date(2010, 2, 5), Open=192.63000300000002, High=196.0, Low=190.850002, Close=195.460001, Volume=212576700, Adj Close=25.323710000000002)

Convert to Python Dictionary using asDict()

In [42]:
results[1].asDict()

{'Date': datetime.date(2010, 2, 5),
 'Open': 192.63000300000002,
 'High': 196.0,
 'Low': 190.850002,
 'Close': 195.460001,
 'Volume': 212576700,
 'Adj Close': 25.323710000000002}

In [43]:
# Access the values of a Row object by column names as dictionary keys.
results[1].asDict()['Date'] # datetime.date(2010, 2, 5)

datetime.date(2010, 2, 5)

In [44]:
spark.stop()

### More on RDDs

Using groupBy()

In [50]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext("local", "GroupByExample")

# Create an RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# Group numbers by even or odd
grouped_rdd = rdd.groupBy(lambda x: "even" if x % 2 == 0 else "odd")

Using MapValues

In [51]:
# Convert grouped results to a more readable format
result = grouped_rdd.mapValues(list).collect()

In [52]:
# Print the result
for key, values in result:
   print(f"{key}: {values}")

odd: [1, 3, 5]
even: [2, 4, 6]


In [53]:
sc.stop()

Example: Create a DataFrame from a list of tuples

In [54]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basic Operations").getOrCreate()
# Sample data
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["Name", "Age"] # Create DataFrame
df = spark.createDataFrame(data, columns)
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+

