In [None]:
"""PySpark DataFrames is like Pandas DataFrames.
The key difference in Spark is how the data is distributed. 
Pandas operates on a single compute instance,
while PySpark distributes data across multiple instances."""

In [None]:
""" Pyspark setup:

*** 1) Intall pyspark by using "pip install pyspark" on terminal.
*** 2) Install "Adoptium Java 17" which is compatible with PySpark versions 3.4 and more.
*** 3) Set Java_Home system variable using the "Adoptium Java 17" path
*** 4) Run this Pyspark Test code to check whether working on not.
""" 

# Test
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("QuickTest") \
    .master("local[1]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

df = spark.range(5)
df.show()

spark.stop()

# If run without any error, means Pyspark is working.

In [4]:
# Create SparkSession from builder
from pyspark.sql import SparkSession

# Initialize a SparkSession (set up or create a session)
my_spark = SparkSession.builder.appName("MySparkApp").getOrCreate()
print(my_spark)

#spark.stop()

<pyspark.sql.session.SparkSession object at 0x0000028E5D999400>


In [None]:
"""Creating DataFrames from various data sources:

spark.read.csv("path/file_name.csv")            => Read csv file (Comma Seperated Value)
spark.read.json("path/file_name.json")          => Read json file (JavaScript Object Notation)
spark.read.parquet("path/file_name.parquet")    => Read parquet file
spark.read.schema()                             => Read schemas from database
"""

In [None]:
# Create a DataFrame using cvs file:

# DataFrame_name = spark.read.csv('path', header=True, InferSchema=True)
census_df = spark.read.csv("census.csv", 
                ["gender","age","zipcode","salary_range_usd","marriage_status"])
# Show the first 5 rows = of the DataFrame
census_df.show()
# Create DataFrame
DataFrame_Name = spark.createDataFrame()
# Inspect and show the schema (Columns and types)
DataFrame_Name.printSchema()


In [None]:
"""Opertaions with PySpark DataFrames:

.select()                                       => to select
.filter()                                       => to filter rows based on a condition
.where()                                        => to filter match a specific value
.groupBy().Aggregation_Type()                   => to group, summarize and aggregate on one column (Aggregation_Type: sum(), min() , max(), avg(),... *** All are lowercase)
.groupBy().Agg()                                => to group and aggregate on more than one column using agg()
.sort()                                         => to sort on one column
.orderBy()                                      => to sort on more than one column
df1.join(df2, on="column", how="join_type")     => to join two DataFrames with same name. join_type must be one of inner, left ,right, outer, right outer, left outer.
df1.join(df2, column1=column2, "join_type")     => to join two DataFrames with different names 
df1.union(df2)                                  => to combine and append rows from two DataFrames (**with same schemas**) 
"""


# Some examples:

# Select and show
df.select("name","age").show()

# Filter and show
df.filter(df["age"] > 30).show()

# Where and show
df.where(df["age"] == 30).show()

# Count and print using F Sting
row_count = DataFrame_Name.count()
print (f'Number of rows: {row_count}')

# GroupBy on one column then show
DataFrame_Name.groupBy('Column_Name1').Agg_Type('Column_Names').show()

# GroupBy on more than one column by using "agg" then show
DataFrame_Name.groupBy('Column_Name1').agg('{'Column_Names':'Agg_Type'}').show()

# Average "salary" for both "entry level" and "Canada" (Two filters and one groupBy) then show
CA_jobs = ca_salaries_df.filter(ca_salaries_df['company_location'] == "CA").filter(ca_salaries_df['experience_level']== "EN").groupBy().avg("salary_in_usd")
CA_jobs.show()

# Sort then show
df.sort("age", ascending= False).show()

# Filter rows with column "Salary" above 500
df.filter(df['salary'] > 500)

# GroupBy column "deparment" and calculate avg of column "salary"
df.groupBy("department").avg("salary")

# Join
df1.join(df2, df1.ID1 = df2.ID2, "inner")

# Filter and groupBy to find the maximum salaries for large companies
salaries_df.filter(salaries_df.company_size == "L").groupBy().max("salary_in_usd").show()

# Filter and groupBy to Average salaries at large us companies
large_companies=salaries_df.filter(salaries_df.company_size == "L").filter(salaries_df.company_location == "US").groupBy().avg("salary_in_usd")

In [None]:
"""Data Manipulation with PySpark DataFrames

.na.drop()                                              => to remove rows with null values
.drop("column_name1","column_name2",...)                => remove unneccessary columns
.isNotNull()                                            => to filter out nulls
.na.fill({"column_name":value})                         => to replace nulls in a column with a specific value
.withColumn("new_column_name",expression)               => to add a new column with expression (formula based on other columns)
.withColumnRenamed("new_column_name","old_column_name") => to rename an existing column

"""
# Drop nulls (missing values)
df.na.drop()

# Remove column named "department"
df.drop("department")

# Filter out nulls in a column
df.where(col("Column_Name").IsNotNull())

# Fill nulls in column "age" with 0
df.fill({"age" : 0})

# Create a new column named "age_plus_five"
Newdf = df.withColumn("age_plus_five", df['age']+5)

# Rename the column "age" to "years"
df.withColumnRenamed("age","years")

In [None]:
"""DataTypes in PySpark DataFrames:

IntegerType()                       => A Data type that represents a Whole Number
LongType()                          => A Data type that represents a Large whole number
FloatType Or DoubleType             => A Data type that represents a Decimal values
StringType()                        => A Data type that represents a Text or String
ArrayType(StringType(),False)       => A Data type that represents a Array or list
StrucField()                        => A Data type that represents a Field (column) and it's nullablity
StrucType(StructField, DataType())  => A Data type that represents a group of related fields built by StrucFields
MapType(StringType(),StringType())  => A Data type that represents a key-value pairs like dictionary

Function                               Description
---------                              ---------
Array()                             => To build an array
lit()                               => To add a new column by assigning a constant value to DataFrame
"""

# Using StrucType() and StrucField() to define a schema:
from pyspark.sql.types import (StructType,StructField, IntegerType, StringType, ArrayType)

# Construct the schema
Defiend_schema = StructType([
                        StructField("id",IntegerType(),True),
                        StructField("name",StringType(),True),
                        StructField("scores",ArrayType(IntegerType()),True)
                            ])
# Set the schema on a DataFrame1 
df1 = spark.createDataFrame (Your_Data, schema= Defiend_schema)
# Set the schema on a DataFrame2 
df2 = spark.read.csv ("CSV_Name", sep='Seperator', header = True , schema= Defiend_schema)
# new way to print the schema
df1.printSchema()

# add an array column with constant value of 25
from pyspark.sql.types import (Array, lit, StructField, StringType, MapType)
df.withColumn("Column_Name",Array(lit(25)))

# define a Map (dictionary)
StructField("ColumnName",MapType(StringType(),StringType(), True))

In [None]:
"""User Defined Functions (udf) in PySpark (like the ones in pandas)

***PySpark udf need spark session registration.
***PySpark udf does columnar level changes
"""
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the function
def to_uppercase(s):
    return s.upper() if s else None

# Register the function in spark session using PySpark UDF function and define it's data type
to_uppercase_udf = udf(to_uppercase, StringType())

# Appy the new function into column "name" and add new column named "uppered" 
df_New = df.withColumn("uppered", to_uppercase_udf(df["name"]))

In [None]:
"""Using Pandas_UDF function in PySpark

***pandas_udf does not need spark session registration.
***pandas_udf can be called outside the spark session.
***pandas_udf does row level changes (Despite PySpark udf does columnar level changes).
***pandas_udf outperform PySpark udf over performance for using large datasets cause runs outside the Spark session.
"""
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

# Define a Pandas UDF that adds 10 to each element in a vectorized way
@pandas_udf(DoubleType())
def add_ten_pandas(column):
    return column + 10

# Apply the UDF and show the result
df.withColumn("10_plus", add_ten_pandas(df['value']))
df.show()

In [None]:
"""Resilient distributed datasets (RDD) in PySpark

***RDD is the core of spark.
***RDD does Paralellization and split data and computation into multiple worker nodes in a same cluster.
***RDDs are immutable and connot be changed once created.
***Workers node process data in parallel and combine at the end.
***Faster process in working with large datasets (several terabytes).

"""

In [None]:
"""Creating an RDD

RDD function:
paralelize()         => To create RDD 
collect()            => To collect and show a summary of the RDD's result
Map()                => To apply a function into RDD

RDD Methods:
.rdd                 => To convert a DataFrame into RDD
.toDF                => To convert back a RDD to a DataFrame

"""
from pyspark.sql import SparkSession, Row

# Creat new spark session RDD1
spark = SparkSession.builder.appName("RDD1").getOrCreate()

# Create a sample DataFrame
df = spark.createDataFrame([
        Row(a=1, b=2., c='string1'),
        Row(a=2, b=3., c='string2'),
        Row(a=4, b=5., c='string3')
                            ])
# Convert the DataFrame into RDD
Rdd1 = df.rdd

# Show a summary of RDD
Result1 = Rdd1.collect()

# Print all RDD's result
for row in Result1:
    print(row)


In [None]:
""" RDD V.S. DataFrames:

RDDs offer a low-level interface, 
providing maximum flexibility. 
You can manipulate data at a granular level, 
but this flexibility comes at the cost of requiring more lines of code for even moderately complex operations. 
One strength of RDDs is their ability to preserve data types across operations. 
However, they lack the schema-aware optimizations of DataFrames, which means operations on structured data are less efficient and harder to express.
While RDDs can scale to handle large datasets, they’re not as optimized for analytics as DataFrames. 
DataFrames are optimized for ease of use, providing a high-level abstraction for working with data. 
DataFrames encapsulate complex computations, making it easier to achieve our objectives with less code and fewer errors. 
One of the standout features of DataFrames is their SQL-like functionality. 
With SQL syntax, even complex transformations and analyses can be performed in just a few lines of code. 
DataFrames come with built-in schema awareness, meaning they contain column names and data types, just like a structured table in SQL. 


    DataFrame                                    RDD
    -----                                        -----
    Ease of use, SQL-like Operations             More code, More Error
    built-in Schema (columns names and types)    No Schema (Hard to work with SQL)
    Small Scale                                  Large Scale
    good at Analytics                            Poor at Analytics

"""

In [None]:
""" Introduction to Spark SQL

***Spark SQL is a module of Apache Spark for structured data processing.
***Allows to run SQL Queries inside data processing
***Combination of PySpark and SQL for complex workflows
***Easy access to structured data (like SQL Sever)
***Flexibility and speed for working with large datasets. 

"""

In [None]:
"""Spark SQL (Similar to Dynamic SQL Query)

Formula ===> spark.sql("Query")
"""

# Initialize Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL Exampple").getOrCreate()

# Sample DataFrame
data = [("Jack", "HR", 30), ("Bob", "IT", 40), ("Chat","Sale",25)]
columns = ["Name","Department","Age"]

df = spark.createDataFrame(data, schema=columns)

# Register DataFrame as temporary view
df.createOrReplaceTempView("People")

# Runing query using spark sql by passing the query as string
result = spark.sql ("SELECT Name, Age FROM People WHERE Age > 30") 

# Print the top 10
result.show(10)

# Print basic statistics
result.describe().show()

In [None]:
"""PySpark SQL functions for aggregation

Commonly used functions include:
SUM(), COUNT(), AVERAGE(), MAX(), and MIN() 
These are applied using either SQL queries with spark.sql()
"""

'PySpark Aggregation\n\nCommonly used functions include:\nSUM(), COUNT(), AVERAGE(), MAX(), and MIN() \nThese are applied using either SQL queries with spark.sql()\n'

In [None]:
"""Combining SQL operations and DataFrame

***By combining these approaches, we get the best of both worlds: 
   the expressiveness of SQL and the programmatic control of the DataFrame
"""

# Example: Filteing data using DataFrame and then using SQL Operations:

filtered_df = df.filter(df.salary > 3000)

# Register filterd DataFarme as a View
filtered_df.createOrReplaceTempView("filtered_employees")

# Aggregate using SQl on the view
spark.sql ("""
    SELECT Department, Count(*) As Employee_Count
    FROM filtered_employees
    GROUP BY Department
           """).show()

In [None]:
"""Handling data types in aggregations

*** Data type mismatches can lead to errors or unexpected results
*** PySpark provides functions and methods like `cast()` to convert data types before processing
"""

# Example:

# Creating a DataFrame
data = [("HR","3000"),("IT","4000"),("Finance","3500")]
columns = ["Department","Salary"]
df = spark.createDataFrame (data, schema=columns)

# Add a new column and casting data type
df = df.withColumn("Integer_Salary", df['Salary'].cast("int"))

# Perform aggregation
df.groupBy("Department").sum("Salary").show()


In [None]:
"""RDD Aggregation ()
*** The best way is using lambda funciton
"""
# RDD Agrregation for does the same as previous example

# Map the DataFrame to an RDD
rdd = df.rdd.map(lambda row: (row["Department"], row["Salary"]))

# Apply a lambda function to get the sum of the DataFrame
rdd_aggregated = rdd.reduceByKey(lambda x,y: x + y)

# Show the collected Results
print(rdd_aggregated.collect())

In [None]:
"""Best practices for PySpark aggregations

Here are some best practices for PySpark aggregations: 
***Filter early to reduce data size before performing aggregations. 
***Ensure data is clean and correctly typed. 
***Avoid using the entire dataset by minimizing operations like `.groupBy()`.
***Choose the right API by prefering DataFrames for most tasks due to their optimizations.
***Monitor performance, by using `explain()` to inspect the execution plan and optimize accordingly.
"""

In [None]:
"""Optimizing PySpark jobs Method1: Scale

***optimizing PySpark jobs becomes essential for managing performance, resource usage, and execution speed.
***Methods like `broadcast()` will load a smaller dataset across the cluster, using all available compute.
***broadcast() method is used to distribute a small dataset across all worker nodes, minimizing shuffling during join operations.

"""
# broadcast Method:
joined_df = large_df.join(broadcast(small_df)), on="key_column" , how="inner")

In [None]:
"""Optimizing PySpark jobs Method2: Execution plans

***First, we’ll explore how to interpret Spark execution plans to identify performance bottlenecks
***We can inspect and understand Spark execution plans process on its distributed cluster using the `.explain()` method. 
***The `explain()` method details the logical and physical plans for optimization.
***By analyzing Spark execution plans, we can spot inefficiencies, like redundant shuffles or unoptimized joins, and address them before running the job.

"""
# explain Method to view execution plan:
df.filter(df.Age > 40).select("Name").explain()

In [None]:
"""Optimizing PySpark jobs Method3: Catching or Persisting

***Second,we’ll discuss caching and persisting DataFrames, which can significantly speed up iterative queries.
***When working with large datasets, we often reuse intermediate results across multiple operations. 
***Recomputing these results can be costly, especially when reading from disk. 
***To avoid this, we have two tools that keeps data readily available, caching and persisting. 
***The `.cache()` method stores a DataFrame in memory for fast access.
***The `.persist()` method offers more flexibility by letting us choose storage levels.

Catching    => Store data in memory for faster access for small datasets
Persisting  => Store data into different storage levels (like hard disk) for larger datasets

"""

# Catche the DataFrames into memory:
df.cache()
# Then perform desired operations on cached DataFrame like:
df.filter(df["column1"] > 50).show()
# unpersist it after your use
df.unpersist()


# Persisting DataFrames into storagelevel
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
# Then Perform transformations like:
result = df.groupBy("column2".agg({"column3":"sum"}))
# unpersist it after your use
df.unpersist()

"""
***Persist method is especially useful for long-running jobs on large clusters. "
***However, caching consumes memory, so it’s important to uncache data when it’s no longer needed, using `.unpersist()` method."
***In this example, if the DataFrame doesn’t fit in memory, Spark writes the overflow to disk. 
***This ensures our operations can still proceed without crashing due to resource constraints. 
***You will encounter it as you work with PySpark in the real world.

"""

In [None]:
"""Optimizing PySpark jobs Method4: Using best practices

here are a few best practices to optimize PySpark jobs: 

1) Use Small Subsections: 
Pick functions and methods like map() over whole dataset tools like groupBy() that require shuffles. 

2) Broadcast Joins: 
For small datasets, use broadcast() to load the dataset onto all nodes and avoid shuffles. 

3) Avoid Repeated Actions: Operations like count() or show() trigger jobs. Store intermediate results to prevent recomputation.

"""

In [None]:
#Optimizing PySpark Example

# Sample DataFrame
data = [("Jack", "HR", 30, 1000), ("Bob", "IT", 40, 1500), ("Chat","Sale",25, 2000)]
columns = ["Name","Department","Age","Salary"]

# Load dataset into a DataFrame
df = spark.createDataFrame(data, schema=columns)

# Cache the DataFrame
df.cache()

# Perform aggregation
agg_result = df.groupBy("Department").sum("Salary")

# Analyze the execution plan
agg_result.explain()

# Uncache the DataFrame
df.unpersist()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Department#203], functions=[sum(Salary#205L)])
   +- Exchange hashpartitioning(Department#203, 200), ENSURE_REQUIREMENTS, [plan_id=92]
      +- HashAggregate(keys=[Department#203], functions=[partial_sum(Salary#205L)])
         +- InMemoryTableScan [Department#203, Salary#205L]
               +- InMemoryRelation [Name#202, Department#203, Age#204L, Salary#205L], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(1) Scan ExistingRDD[Name#202,Department#203,Age#204L,Salary#205L]




DataFrame[Name: string, Department: string, Age: bigint, Salary: bigint]

In [None]:
"""Advanced topics you can explore as you continue your PySpark journey (Not coverd yet):

***advanced cluster configuration (cluster management)
***performance optimization (spark job optimization)
***big data applications (big data fundamentals with PySpark)
***streaming data processing with PySpark (Clean data with PySpark)
***Sparks machine learning pipelines and integration with cloud-based tools. Whether you’re a data engineer, a data scientist, or a machine learning engineer, you now have the skills to leverage PySpark for managing and analyzing big data."""