In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os

In [None]:
!apt-get update
!apt-get install openjdk-11-jdk -y
!pip install pyspark

In [3]:

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.11/dist-packages/pyspark"

In [4]:
!pip install pyngrok
from pyngrok import ngrok

Collecting pyngrok
  Downloading pyngrok-7.2.7-py3-none-any.whl.metadata (9.4 kB)
Downloading pyngrok-7.2.7-py3-none-any.whl (23 kB)
Installing collected packages: pyngrok
Successfully installed pyngrok-7.2.7


In [5]:
#bring in Pyspark functions into your session
from pyspark.sql.functions import *

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

•	Reading and Writing Data

In [7]:


# Create simple dataframe
df = spark.createDataFrame([(1, "foo"), (2, "bar")], ["id", "value"])
df.show()

+---+-----+
| id|value|
+---+-----+
|  1|  foo|
|  2|  bar|
+---+-----+



In [11]:
mydata = spark.read.format("csv").option("header","true").load("original.csv")

In [8]:
# prompt: import json file

import json
df2 = spark.read.json('people.json')
df2.show()




+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



Data Checking/cleaning & *Validation*

In [12]:
mydata.printSchema()

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- City: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [17]:
#Exploratory Functions


df.describe()
df.summary()
mydata.columns
df2.dtypes


DataFrame[summary: string, id: string, value: string]

In [22]:
#Null Checking & Missing Value Analysis
mydata.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in mydata.columns
]).show()


mydata.filter(mydata.col.isNull())
#Drop Rows with Any Null
mydata.na.drop()
mydata.na.fill(0)




DataFrame[id: string, first_name: string, last_name: string, gender: string, City: string, JobTitle: string, Salary: string, Latitude: string, Longitude: string]

In [24]:
#Replace Nulls Only in Selected Columns

df_filled = df2.fillna(0, subset=['age'])
df_filled .show()

+---+-------+
|age|   name|
+---+-------+
|  0|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+



In [25]:
# Duplicate Detection
mydata.dropDuplicates()

mydata.groupBy(mydata.columns) \
  .count() \
  .filter("count > 1") \
  .show()


+---+----------+---------+------+----+--------+------+--------+---------+-----+
| id|first_name|last_name|gender|City|JobTitle|Salary|Latitude|Longitude|count|
+---+----------+---------+------+----+--------+------+--------+---------+-----+
+---+----------+---------+------+----+--------+------+--------+---------+-----+



Data Mining & Feature Engineering

In [None]:
#Filtering and Conditions
filter()
where()


chained conditions using &, |, ~

In [None]:

from pyspark.sql.functions import col


In [None]:
data = [
    (1, "Alice", 29),
    (2, "Bob", 35),
    (3, "Eve", 25),
]

df = spark.createDataFrame(data, ["id", "name", "age"])

# Simple condition: age > 30 OR name == "Eve"
filtered_df = df.filter((col("age") > 30) | (col("name") == "Eve"))



In [None]:
filtered_df.show()

In PySpark, you can use chained conditions with the bitwise operators:
•	& for AND
•	| for OR
•	~ for NOT
These are used inside filter() or where() clauses, and each condition must be enclosed in parentheses to avoid operator precedence issues.


In [None]:

from pyspark.sql.functions import col



In [None]:
# Sample data
data = [
    (1, "Alice", 29),
    (2, "Bob", 35),
    (3, "Charlie", 30),
    (4, "Diana", 40),
    (5, "Eve", 25)
]

# Create DataFrame
df = spark.createDataFrame(data, ["id", "name", "age"])


In [None]:
# Chained filter with &, |, ~
filtered_df = df.filter(
    ((col("age") > 30) & (col("name") != "Diana")) | (~(col("age") < 30))
)

# Show result
filtered_df.show()
