In [None]:
                                                            # Structured APIs
# Chapter 4. Structured API Overview
'''
The Structured APIs are a tool for manipulating all sorts of data, from unstructured log files to semi-structured CSV files and highly structured Parquet files. 
These APIs refer to three core types of distributed collection APIs:
1. Datasets
2. DataFrames
3. SQL tables and views

Note: Spark is a distributed programming model in which the user specifies transformations. Multiple transformations build up a directed acyclic graph of instructions. 
An action begins the process ofexecuting that graph of instructions, as a single job, by breaking it down into stages and tasks to execute across the cluster. 
The logical structures that we manipulate with transformations and actions are DataFrames and Datasets.
To create a new DataFrame or Dataset, you call a transformation. To start computation or convert to native language types, you call an action.

DataFrames and Datasets: DataFrames and Datasets are (distributed) table-like (i.e. structured) collections with well-defined rows and columns.
Tables and views are basically the same thing as DataFrames. We just execute SQL against them instead of DataFrame code

Schemas: A schema defines the column names and types of a DataFrame. You can define schemas manually or read a schema from a data source (often called schema on read). Schemas consist of
types, meaning that you need a way of specifying what lies where.

Dataframe vs Datasets: DF is untyped, though, columns have types, but they are evaluated at run time, whereas for DS, should be defined first. Therefore, for SchemaOnRead type,
with only few records of data, there could be chance that data type may not accurately represent the run time value. say first 5 records on read of schema has all decimal values but later 
some char values. Data sets are only available in Scala and Java (JVM based). 

Spark has it own datatypes internally as its a programming language too. ex: ByteType, IntegerType, LongType,DateType etc.

Overview of Structured API Execution: 
1. Write DataFrame/Dataset/SQL Code.
2. If valid code, Spark converts this to a Logical Plan.
3. Spark transforms this Logical Plan to a Physical Plan, checking for optimizations along
the way.
4. Spark then executes this Physical Plan (RDD manipulations) on the cluster.

Execution: Upon selecting a physical plan, Spark runs all of this code over RDDs, the lower-level programming interface of Spark. Spark performs further
optimizations at runtime, generating native Java bytecode that can remove entire tasks or stages during execution. Finally the result is returned to the user.
'''

In [None]:
# Chapter 5. Basic Structured Operations

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkInJupyter") \
    .master("local[*]") \
    .getOrCreate()

spark

In [None]:
df = spark.read.format("json").load("./data/flight-data/json/2015-summary.json")
spark.read.format("json").load("./data/flight-data/json/2015-summary.json").schema # cehck relative path from where spark application builder located.

In [None]:
#Creation of DataFrames: different ways of creating DataFrame in PySpark.
#1. From a List of Rows or Tuples:
data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, schema=columns)
df.show()
#2. From an Existing RDD.
'''
Using toDF(): A quick way to convert an RDD.

Using createDataFrame(rdd): Gives you more control over the schema.

3. Reading from Files:
spark.read.csv("path.csv", header=True)
spark.read.parquet("path.parquet")
spark.read.json("path.json")

4. from Pandas DataFrame:

import pandas as pd

pdf = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
sdf = spark.createDataFrame(pdf)

5. From External Database:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/db") \
    .option("dbtable", "employees") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

6. Using Spark SQL Queries:
spark.sql("CREATE OR REPLACE TEMPORARY VIEW people AS SELECT 'John' as name, 30 as age")
df = spark.sql("SELECT * FROM people WHERE age > 25")

'''

In [None]:
'''
A schema is a StructType made up of a number of fields, StructFields, that have a name,
type, a Boolean flag which specifies whether that column can contain missing or null values,
and, finally, users can optionally specify associated metadata with that column. The metadata is a
way of storing information about this column (Spark uses this in its machine learning library).
'''
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema)\
   .load("./data/flight-data/json/2015-summary.json")
df.show(1)


In [None]:
# Columns: Column can be defined individually , but value of column exists as part of row, therefore as part of a dataframe.
from pyspark.sql.functions import col, column, expr, lit, concat, when

col("someColumnName")
column("someColumnName")
print(col("someColumnName"),column("someColumnName") )

#An expression is a set of transformations on one or more values in a record in a DataFrame.
expr("(((someCol + 5) * 200) - 6) < otherCol") 

# Demonstration of modularity with columns and expressions outside Dataframe upfront.
#1. Define your logic UP FRONT (outside the context of any data)
# These are just "blueprints" for calculations
is_adult_col = (col("age") >= 18)

status_desc_col = when(col("age") >= 18, "Adult") \
                  .otherwise("Minor") \
                  .alias("life_stage")

full_info_col = concat(col("name"), lit(" - "), col("age")).alias("profile_summary")

# 2. Setup Spark and dummy data
spark = SparkSession.builder.appName("ColumnDemo").getOrCreate()
data = [("Alice", 25), ("Bob", 12), ("Charlie", 30)]
df = spark.createDataFrame(data, ["name", "age"])

# 3. Apply those pre-defined columns to the DataFrame
# Spark matches the string names inside col() to the DF schema now
final_df = df.select(
    "name",
    "age",
    is_adult_col.alias("is_adult"),
    status_desc_col,
    full_info_col
)

final_df.show()

# accessign the dataframe columns
spark.read.format("json").load("./data/flight-data/json/2015-summary.json").columns

In [None]:
# Rows:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
  StructField("some", StringType(), True),
  StructField("col", StringType(), True),
  StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
print('Row Values: ', myRow[0], myRow[2])
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show(1) # returns n records to console
myDf.first() # returns first n records to program control, use object, say column object to access it,ex:
print(myDf.first()["some"])
myDf.head()

In [None]:
# Transformations on DataFrames:
'''
We can add rows or columns
We can remove rows or columns
We can transform a row into a column (or vice versa)
We can change the order of rows based on the values in columns
'''

In [None]:
df = spark.read.format("json").load("./data/flight-data/json/2015-summary.json")
df.show(1)


df.select("DEST_COUNTRY_NAME").show(2)
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

from pyspark.sql.functions import expr, col, column
df.select(expr("DEST_COUNTRY_NAME"),col("DEST_COUNTRY_NAME"),column("DEST_COUNTRY_NAME")).show(2)


df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")).show(2)
df.select(expr("'The '|| DEST_COUNTRY_NAME as `DEST CTY`")).show(1)

'''
We can treat selectExpr as a simple way to build up
complex expressions that create new DataFrames. In fact, we can add any valid non-aggregating
SQL statement, and as long as the columns resolve, it will be valid! '''

df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

df.selectExpr(
  "*", # all original columns
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)


In [None]:
# Spark SQL:
df.createOrReplaceTempView("dfTable") # Creata a Temp. view to register in catalog of Spark for SQL queries
spark.sql("select count from dfTable").show(3)

In [None]:
# Adding new columns
from pyspark.sql.functions import lit # lit is for 'literal'

df.select(expr("*"), lit(1).alias("One")).show(2)

df.withColumn("numberOne", lit(1)).show(2)

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)

#Renaming Columns

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

# Handling reserved characters: use back-tick or single quote with 'withColumn'
dfWithLongColName = df.withColumn(
    "This Long Column-Name",
    expr("ORIGIN_COUNTRY_NAME"))

dfWithLongColName.selectExpr(
    "`This Long Column-Name`",
    "`This Long Column-Name` as `new col`").show(2)

dfWithLongColName.select(expr("`This Long Column-Name`")).columns

# Removing Columns:
df.drop("ORIGIN_COUNTRY_NAME").columns
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME") # Multiple Columns

# Changing a Column’s Type (cast):
df.withColumn("count2", col("count").cast("long"))
# SELECT *, cast(count as long) AS count2 FROM dfTable


In [None]:
# Filtering:
df.filter(col("count") < 2).show(2)
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)


In [None]:

# Getting Unique Rows
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count() #256, like, SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable
df.select("ORIGIN_COUNTRY_NAME").distinct().count() #125

In [None]:
# Random Samples: Sometimes, you might just want to sample some random records from your DataFrame.
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

#Random Splits: Random splits can be helpful when you need to break up your DataFrame into a random “splits” of the original DataFrame.For Ml cases.
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

In [None]:
# Appending or Union of New rows:

from pyspark.sql import Row
schema = df.schema
newRows = [
  Row("New Country", "Other Country", 5),
  Row("New Country 2", "Other Country 3", 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows) # for RDDs, massive data, this moves data to worker nodes, as new Rows lives in Driver node, by this, it splits and 
                                                           # distribute to worker nodes. for latest spark, for small set, not required to parallelize.
newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)\
  .where("count = 1")\
  .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
  .show()

In [None]:
# Sorting Rows:
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

# For optimization purposes, it’s sometimes advisable to sort within each partition before another set of transformations.
spark.read.format("json").load("./data/flight-data/json/2015-summary.json").sortWithinPartitions("count")

In [None]:
# Limit
df.limit(5).show()
df.orderBy(expr("count desc")).limit(6).show()

In [None]:
# Repartition and Coalesce:
'''
Another important optimization opportunity is to partition the data according to some frequently
filtered columns, which control the physical layout of data across the cluster including the
partitioning scheme and the number of partitions.
Repartition will incur a full shuffle of the data, regardless of whether one is necessary.
'''

df.rdd.getNumPartitions() # 1
df.repartition(5)
df.repartition(col("DEST_COUNTRY_NAME")) # column based partitioning, if that columnis used for filtering often.
df.repartition(5, col("DEST_COUNTRY_NAME"))

# Coalesce: to combine partitions by reshuffling the partitions.
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2) #This operation will shuffle your data into five partitions based on the destination country name, and
# then coalesce them (without a full shuffle):

In [None]:
# collect():
'''
Spark maintains the state of the cluster in the driver. There are
times when you’ll want to collect some of your data to the driver in order to manipulate it on
your local machine.
'''

collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

In [None]:
collectDF.toLocalIterator() #collects partitions to the driver as an iterator,allows you to iterate over the entire dataset partition-by-partition in a serial manner.
# collects brings all partitions to Driver, therefore, chances of OOM error, where as toLocalIterator, only one partition at a time. therfore, safer alternative to Collect().
# Assume 'df' is a massive DataFrame with millions of rows
iter_rows = df.toLocalIterator()

# This loop only keeps one partition in memory at a time
for row in iter_rows:
    user_email = row["email"]
    # Perform a local Python action (e.g., calling an external API)
    print(f"Processing data for: {user_email}")