# Basic Structred Operations

### SparkTypes

In [0]:
from pyspark.sql.types import * #import all types from pyspark
from pyspark.sql.functions import * #import all fucntions from pyspark

- IntegerType() - # Python has a lenient definition of “integer.” Numbers that are too large will be rejected by Spark SQL if you use the IntegerType(). It’s best practice to use LongType\
- LongType() - Numbers will be converted to 8-byte signed integer numbers at runtime\
- FloatType() - Numbers will be converted to 4-byte singleprecision floating-point\ 
- DecimalType() - Decimal\
- StringType() - string\
- DateType() - datetime.date 
- StructType(fields) - list or tuple - fields is a list of StructFields. Also, fields StructFields. Also, fields with the same name are not allowed.\
- StructField(name, dataType, [nullable]) - The default value of nullable is True. The value type in Python of the data type of this field (for example, Int for a StructField with the data type IntegerType)\

## Data Frame & Schema 

- **Dataframe**- Consists of Series of record- Rows  (Rows represent individual records ) & Coloumns (store values for each record and can contain computed expressions (operations on the data).) \
**DataFrame** is a structured collection of data stored in a distributed manner across a cluster. It is built on top of Spark’s Dataset API. Every row is an object of Row type in Spark.Every column represents either a value or a computed expression (like transformations).\

**What is a Schema**\
A **schema** defines the structure of the DataFrame, including:\
- Column names (e.g., "ID", "Name", "Age").\
- Column data types (e.g., "Integer", "String", "Float").\
> Without a schema, Spark will infer the types automatically, which may not always be accurate.




#### What is Partitioning?



Partitioning = Dividing Data for Efficient Processing\
A DataFrame spreads its data across multiple machines (nodes) in a distributed cluster.\
Each machine processes a portion of the data instead of loading everything into a single node.\
This helps speed up query execution and optimize memory usage.
**Types of Partitioning in Spark**\
There are two types of partitioning in a DataFrame:\
**1. Column-Based Partitioning (Deterministic Partitioning)**
  - Data is split based on values in a specific column.
  - Helps when querying specific column values because Spark only scans relevant partitions\
  - Example: Partition by "Age" Column
If we partition the DataFrame by Age, Spark stores:
  - Partition 1: All records where Age is between 20-30
  - Partition 2: All records where Age is between 30-40
  - Partition 3: All records where Age is above 40

**2. Random Partitioning (Non-Deterministic Partitioning)**
  - Data is randomly split across multiple partitions.
  - This is useful when there is no specific column to partition by.
  - Ensures load balancing across the cluster.


### Read Json

In [0]:
dbutils.fs.ls('FileStore/tables') # function to fidn the Filepath and Filestore in the dbfs filestore

Out[3]: [FileInfo(path='dbfs:/FileStore/tables/2015_summary.csv', name='2015_summary.csv', size=7080, modificationTime=1741666214000),
 FileInfo(path='dbfs:/FileStore/tables/2015_summary.json', name='2015_summary.json', size=21368, modificationTime=1741692430000),
 FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales-1.csv', name='BigMart_Sales-1.csv', size=869537, modificationTime=1740727435000),
 FileInfo(path='dbfs:/FileStore/tables/BigMart_Sales.csv', name='BigMart_Sales.csv', size=869537, modificationTime=1740138394000),
 FileInfo(path='dbfs:/FileStore/tables/drivers.json', name='drivers.json', size=180812, modificationTime=1740139360000)]

In [0]:
df=spark.read.format('json').load('dbfs:/FileStore/tables/2015_summary.json')
#read json file. Spark tills input format('Json')- is Json file. 
# #load ('filepath') loads the data from the given file path.
# df is our DataFrame, which contains flight data

In [0]:
df.printSchema()

# Since we haven’t defined a schema, Spark automatically infers the schema based on the data in the JSON file.
# Spark infers the schema from the data (this is called schema-on-read).

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [0]:
df.schema
# use to give the output schema which is of Structype and Stuctfield defined. OBSERVE () NOT used in SCHEMA. AS IT PROPERTY not a Function

Out[7]: StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

#### Manually Defining a Schema
In production (ETL pipelines), we manually define schemas to:\
✅ Ensure correct data types (avoid wrong inferences).\
✅ Improve performance (avoid slow auto-detection).\
✅ Handle missing data properly (decide if NULLs are allowed)

StructType () and StructField () to manually define a schema

In [0]:
from pyspark.sql.types import *

my_schema = StructType(
[
StructField( 'DEST_COUNTRY_NAME', StringType(), True ), # Allows NULL values
StructField('ORIGIN_COUNTRY_NAME', StringType(), True),  # Allows NULL values
StructField('count', LongType(), False) # doesnt Allows NULL valu
]

)

#StructType([]): Represents the entire schema (like a table structure).
# StructField(name, data_type, nullable): Defines each column’s name, type, and NULL handling.
# eg: 'DEST_COUNTRY_NAME', StringType(), True → A column of type String, allowing NULL values.
# "count", LongType(), False → A column of type Long, NOT allowing NULL values.

Loading JSON with a Defined Schema


In [0]:
df=spark.read.format('json').schema(my_schema)\
    .load('dbfs:/FileStore/tables/2015_summary.json')

In [0]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



### Adding Metadata to the schema
Metadata in PySpark is additional information stored in the schema of a DataFrame. It does not affect how Spark reads or processes data but can be useful for documentation, optimizations, or machine learning tasks. Metadata is extra information about a column in a DataFrame schema. It does not change the actual data or column structure but helps store additional details eg : Column descriptions, Data source information, Constraints etc.\

PySpark stores metadata using a dictionary (key-value pairs) inside the StructField of a StructType

 

In [0]:

# Defining a schema with metadata
mySchema_new = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True, metadata={"description": "Destination Country"}),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True, metadata={"description": "Origin Country"}),
    StructField("count", LongType(), False, metadata={"hello": "world", "data_source": "Bureau of Transport Statistics"})
])
#We add metadata when defining a schema manually using the metadata parameter in the StructField constructor.
# metadata={"description": "Destination Country"} → This adds a description to the DEST_COUNTRY_NAME column.

# Applying the schema while reading a JSON file
df_new = spark.read.format("json").schema(mySchema_new).load("dbfs:/FileStore/tables/2015_summary.json")

# Printing schema with metadata
df_new.printSchema()


root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



Access Metadata
To retrieve metadata from a column, we use  .metadata on the required coloumn

In [0]:
df_new.schema["count"].metadata

Out[30]: {'data_source': 'Bureau of Transport Statistics', 'hello': 'world'}

### Columns and Expressions in PySpark
In PySpark, a column is a logical representation of a value that is calculated for each row in a DataFrame.\
Columns are defined using functions like col() or column(). mostly col( ) is used


In [0]:
# to refer to the coloum we can use in the expressions
col('count')
# to refer to the coloumn in the specific datafarame we can use in the expressions when working with multiple dataframes
df.col('count')


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1002669071205069>:2[0m
[1;32m      1[0m [38;5;66;03m# to refer tot he coloum we can use[39;00m
[0;32m----> 2[0m col([38;5;124m'[39m[38;5;124mcount[39m[38;5;124m'[39m)
[1;32m      3[0m [38;5;66;03m# to refer to the coloumn in the specific datafarame we can use[39;00m
[1;32m      4[0m df[38;5;241m.[39mcol([38;5;124m'[39m[38;5;124mcount[39m[38;5;124m'[39m)

[0;31mNameError[0m: name 'col' is not defined

In [0]:
df.columns

# gives list of all coloumn in the dataframe

Out[41]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

#### Expressions in PySpark?
- Expressions take column names as input.
- They process values in each row based on the transformation.
- They return a single calculated value per row

expr() - Fucntion is used to create a expression on the coloum. It helps in performing column transformations, calculations, and conditions. 


In [0]:


from pyspark.sql import *
from pyspark.sql.functions import *

# Sample DataFrame
data = [(1, 100), (2, 200), (3, 300)]
df_dummy = spark.createDataFrame(data, ["id", "salary"])

# Using expr() to increase salary by 10%
df_expression = df_dummy.withColumn("new_salary", expr("salary * 1.1"))
df_expression.display()

# withcoloum fucntion is sused to add coloum to the dataframe base don the expression. discussed later.


id,salary,new_salary
1,100,110.0
2,200,220.0
3,300,330.0



#### expr("someCol - 5") is the same transformation as performing col("someCol") - 5, or even expr("someCol") - 5. That’s because Spark compiles these to a logical tree specifying the order of operations.

because:
- Columns are just expressions.
- Columns and transformations of those columns compile to the same logical plan as parsed expressions.

In [0]:
df_expression = df_dummy.withColumn("new_salary", expr("salary") * 1.1)

df_expression.display()

# expr("salary * 1.1") and  expr("salary") * 1.1 -- both gave the same result

id,salary,new_salary
1,100,110.0
2,200,220.00000000000003
3,300,330.0


### Records and Rows in Spark
In Apache Spark, a row in a DataFrame represents a single record of data. Each row contains multiple values, one for each column in the DataFrame. Spark stores these rows as Row objects, which are used to manipulate and process data.
Spark stores Row objects internally as arrays of bytes to optimize performance. However, you don’t need to worry about this because Spark provides column expressions to manipulate data, meaning you don’t directly handle byte arrays.


In [0]:
df.first()
# This command returns a Row object, which contains the values of all columns for the first record in the DataFrame.
#Each value corresponds to a column in the DataFrame

Out[48]: Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

### Creating Rows Manually

Row() - Function is used

In [0]:
myRow = Row("Hello", None, 1, False)

# Accessing Data in Rows
myRow[0]
# Once you have a Row object, you can access its values by position (indexing), 

Out[80]: 'Hello'

In [0]:
myRow[2] # Once you have a Row object, you can access its values by position (indexing), 

Out[64]: 1

In [0]:
df.first()

Out[78]: Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [0]:
df.collect()[4][1]

# Using collect() .collect() converts the entire DataFrame into a list of Row objects.
# [4] selects the 5th row (indexing starts from 0).
# [1] selects the 2nd column from that row (indexing starts from 0)

Out[70]: 'India'

In [0]:
df.take(5)[4]

#this gives a list of rows based on the argument
# The .take(n) function in PySpark is used to retrieve the first n rows from a DataFrame and return them as a list of Row objects.


Out[87]: Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)

In [0]:
df.collect()[4]

# .collect(), but instead of bringing all data to the driver, it only fetches the first n rows efficiently.
# OBSERVE this give a Row object based on the index given here all rows are called in a a array and then indexed

Out[88]: Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)

#### Data Frame Transformations

#### select and selectExpr
When working with PySpark DataFrames, you often need to select specific columns, modify their values, or create new columns. Two important methods to achieve this are:

- **select()** - Used to select specific columns from a DataFrame.
- **selectExpr()** - Used to write SQL-like expressions directly within the selection

In [0]:
df.select('DEST_COUNTRY_NAME').show(5)
#You can select one column from the DataFrame by passing its name as a string.


+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
|    United States|
|            Egypt|
|    United States|
+-----------------+
only showing top 5 rows



In [0]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(5)
#To select multiple columns, just pass multiple column names as strings


+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
|    United States|            Ireland|
|            Egypt|      United States|
|    United States|              India|
+-----------------+-------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import col, column, expr

df.select(
    expr("DEST_COUNTRY_NAME"),  # Using expr
    col("DEST_COUNTRY_NAME"),   # Using col()
    column("DEST_COUNTRY_NAME") # Using column()
).show(2)
# PySpark provides multiple ways to reference a column

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



In [0]:
# If you mix column objects (col("column_name")) and strings ("column_name") in select(), you’ll get an error.
 # df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")  -  - - - -  this code Causes an error!

df.select(col("DEST_COUNTRY_NAME"), col("ORIGIN_COUNTRY_NAME")).show(2)  # doesnt produce any error


+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



### Selectexpr()
It allows SQL-like expressions inside select()

In [0]:
# Renaming Columns Using selectExpr()

df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)


+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [0]:
#You can use selectExpr() to create a new column using an expression.

df.selectExpr(
    "*",  # Select all original columns
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry" # Boolean column
).show(5)
# SELECT *, (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry FROM dfTable LIMIT 2;- SQL Equivalent of ABove Code

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
|    United States|            Ireland|  344|        false|
|            Egypt|      United States|   15|        false|
|    United States|              India|   62|        false|
+-----------------+-------------------+-----+-------------+
only showing top 5 rows



In [0]:

#You can also apply aggregate functions like avg() and count(distinct) in selectExpr().
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)


# SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) FROM dfTable LIMIT 2;  # SQL- Equivalent of above code


+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



# Understanding Spark Literals, Column Manipulation, and Reserved Characters in PySpark

#### Converting to Spark Types (Literals)


When working with Spark DataFrames, we sometimes need to insert constant values into our dataset. These constant values are called literals in Spark.\
**lit( ) Function** - is used to insert a constant value as a new column or within an expression.

In [0]:
df.select('*').show(10)
# * wildcharter is used to select all coloumns from the data frame


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



Alias Function - alias() function in PySpark is used to rename a column temporarily in a DataFrame


In [0]:
df.select(col("DEST_COUNTRY_NAME").alias("DEST_CON")).show(3)

#alias function is applied on the selected coloumn to renmae the coloumn


+-------------+
|     DEST_CON|
+-------------+
|United States|
|United States|
|United States|
+-------------+
only showing top 3 rows



In [0]:
df.select('*', lit('New').alias('Constanat')).show(2)

#lit fucntion entered as an argument enter the constant value 'New' and an alias fucntion is used to create a name for the coloumn. 

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|Constanat|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|      New|
|    United States|            Croatia|    1|      New|
+-----------------+-------------------+-----+---------+
only showing top 2 rows




#### Adding Coloumns
new column can be added using the withColumn() method in PySpark- The withColumn() method is used to create a new column or modify an existing column\
It takes two parameters:
- Column name (string)
- Expression (which can be a constant, an existing column, or a calculated value)


In [0]:
df.withColumn("numberOne", lit(1)).show(2)

# here a number one coloum name is added to the data frame and Constatn lit () fucntion is used to fill the coloumns with constatn 1


+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [0]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)
# this use an Exprssion fucntion to identify the  ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME and resturn a boolean value with a new coloumn withincountry attached to it.

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



#### Renaming Columns
The withColumnRenamed() method is used to rename an existing column\
It takes two parameters:
- Old column name (string)
- New column name (string)


In [0]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest_coury").columns
# the output shows DEST_COUNTRY_NAME" is renamed with "dest - Coloumn name

Out[31]: ['dest_coury', 'ORIGIN_COUNTRY_NAME', 'count']

Case Sensitivity in Spark- By default, Spark SQL is case insensitive, meaning that "columnName" and "COLUMNNAME" are treated as the same- But case sensistivity can be kept to true.




In [0]:
df.select('ORIGIN_CouNTRY_NAME').show(5)

+-------------------+
|ORIGIN_CouNTRY_NAME|
+-------------------+
|            Romania|
|            Croatia|
|            Ireland|
|      United States|
|              India|
+-------------------+
only showing top 5 rows



In [0]:
spark.conf.set("spark.sql.caseSensitive", True)
# case senitivity is set to True


In [0]:
df.select('ORIGIN_CouNTRY_NAME').show(5)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3893457758963658>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mselect[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43mORIGIN_CouNTRY_NAME[39;49m[38;5;124;43m'[39;49m[43m)[49m[38;5;241m.[39mshow([38;5;241m5[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49m[43mkwargs[49m[43m)[49m
[1;32m     49[0m     logger[38;5;241m.[39mlog_success(
[1;32m     50[0m        

In [0]:
spark.conf.set("spark.sql.caseSensitive", False)
# case senitivity is set to False


#### Removing Columns
When working with DataFrames in Apache Spark (PySpark), sometimes we need to remove specific columns that are not required for analysis
- The drop( ) function is specifically built for removing columns:


In [0]:
df.drop("ORIGIN_COUNTRY_NAME").show(2)
# here specificallly the coloumn which is not rewuired is drop from the dataframe


+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|   15|
|    United States|    1|
+-----------------+-----+
only showing top 2 rows



In [0]:
# To remove multiple columns, we can pass multiple column names:
df.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").show(2)


+-----+
|count|
+-----+
|   15|
|    1|
+-----+
only showing top 2 rows



### Changing a Column’s Type (cast)
Sometimes, we need to convert the data type of a column. This is called casting. if a column is stored as StringType but we need it as IntegerType, we use the cast function.


In [0]:
# If the count column is currently stored as an integer, but we need it as a long, we can do

df.withColumn('count', col('count').cast('long')).printSchema()


root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



### Filtering Rows
Filtering is used to select only specific rows based on conditions.
- filter()
- where() (works the same as filter(), but is more similar to SQL syntax)


In [0]:
# Filtering rows where count < 2
df.filter(col('count')>2).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|            Grenada|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
# using where fucntion
df.where('count>2').show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|            Grenada|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
# Filtering rows where count < 2 and ORIGIN_COUNTRY_NAME is not "Croatia"- Filtering with Multiple Conditions

df.where(col('count')<2).where(col('ORIGIN_COUNTRY_NAME') !='Croatia').show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [0]:
# Filtering with Multiple Conditions - With Filter Fucntion
df.filter ((col('count')<2) &( col('ORIGIN_COUNTRY_NAME') != 'Croatia' )   ).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



#### getting Distint rows
operation in DataFrames is finding distinct (unique) values in one or more columns

In [0]:
df.select("ORIGIN_COUNTRY_NAME").distinct().show(5)
# select the coloumn  ORIGIN_COUNTRY_NAME and display disitinct values in the coloumn


+-------------------+
|ORIGIN_COUNTRY_NAME|
+-------------------+
|          Singapore|
|      United States|
|              India|
|            Croatia|
|            Ireland|
+-------------------+
only showing top 5 rows



In [0]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()
# select the coloumn  ORIGIN_COUNTRY_NAME and display disitinct values in the coloumn and count fucntion counts the total distinct value in the coloum

# SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable;- SQL equivalent of the above code.


Out[70]: 125

### Understanding Random Splits, Union, Sorting, and Limit in PySpark

**Random Splits (randomSplit())**

When working with machine learning models, we often need to split our dataset into multiple subsets, such as:

- Training Set (used to train the model)
- Validation Set (used to fine-tune the model)
- Test Set (used to evaluate model performance)
PySpark allows us to randomly split a DataFrame into multiple parts using the .randomSplit() function.

- You define a list of proportions that determine the size of each subset.
- You can also set a seed to ensure the randomness is reproducible (i.e., get the same split every time).generally 42 is used but you can use any integer value as a seed. If you use the same seed every time, you will get the same split 




In [0]:
dataFrames = df.randomSplit([0.25, 0.75], seed=42)
#[0.25, 0.75] → The first DataFrame gets approximately 25% of the data, and the second gets 75%.
#seed=42 → Ensures that every time you run this, the split remains the same.
dataFrames[0].count() 
# The result is a list of DataFrames, where dataFrames[0] is the first split and dataFrames[1] is the second split.

Out[82]: 63

In [0]:
dataFrames[0].show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Austria|      United States|   62|
|           Brazil|      United States|  853|
|         Bulgaria|      United States|    3|
|     Cook Islands|      United States|   13|
|       Costa Rica|      United States|  588|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
dataFrames[1].show(5)

+-------------------+-------------------+-----+
|  DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-------------------+-------------------+-----+
|            Algeria|      United States|    4|
|             Angola|      United States|   15|
|           Anguilla|      United States|   41|
|Antigua and Barbuda|      United States|  126|
|          Argentina|      United States|  180|
+-------------------+-------------------+-----+
only showing top 5 rows



### Concatenating and Appending Rows (union())
DataFrames are immutable – you cannot modify them in place. Instead, if you want to add new rows, you must use the .union() function\
in union() function

- You combine two DataFrames into one.
- Both DataFrames must have the same schema (same columns and data types)

In [0]:
# Appending new rows to a DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create new DataFrame with the same schema
newDF = spark.createDataFrame([
    ("New Country", "Other Country", 5),
    ("New Country 2", "Other Country 3", 1)
], df.schema)

# Union and filter in one step
df.union(newDF).filter(col('DEST_COUNTRY_NAME') == 'New Country').show(150)

# PySpark's createDataFrame() method is used
    #This function is used to create a DataFrame in PySpark.
    #data: The first argument is a list of tuples representing the rows.
    #schema: The second argument is the schema (column structure) of the DataFrame. Instead of manually defining column names and data types, we copy the schema from an existing DataFrame 



+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    5|
+-----------------+-------------------+-----+



##### unionByName() function -  is used to combine two DataFrames by matching column names, even if the columns are in a different order or missing in one of the DataFrames.
- df1.unionByName(df2, allowMissingColumns=False)- allowMissingColumns=False (default): Throws an error if a column is missing in one DataFrame
- allowMissingColumns=True: Allows combining even if one DataFrame has additional columns





In [0]:
data1 = [("Alice", 25), ("Bob", 30)]
columns1 = ["Name", "Age"]

data2 = [(40, "Charlie"), (50, "David")]
columns2 = ["Age", "Name"]

df1 = spark.createDataFrame(data1, columns1)
df2 = spark.createDataFrame(data2, columns2)

# UnionByName
df_union = df1.unionByName(df2)
df_union.show()


+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 40|
|  David| 50|
+-------+---+



In [0]:
# Handling Missing Columns
data1 = [("Alice", 25), ("Bob", 30)]
columns1 = ["Name", "Age"]

data2 = [(40, "Charlie", "USA"), (50, "David", "Canada")]
columns2 = ["Age", "Name", "Country"]

df1 = spark.createDataFrame(data1, columns1)
df2 = spark.createDataFrame(data2, columns2)

# Using allowMissingColumns=True
df_union = df1.unionByName(df2, allowMissingColumns=True)
df_union.show()


+-------+---+-------+
|   Name|Age|Country|
+-------+---+-------+
|  Alice| 25|   null|
|    Bob| 30|   null|
|Charlie| 40|    USA|
|  David| 50| Canada|
+-------+---+-------+



### Sorting Rows (sort() and orderBy())
Sorting is used to arrange data in a specific order
- Using .sort() - Sorts the DataFrame in ascending order by default
- Using .orderBy() -  Works the same way as .sort(), but allows sorting by multiple columns



In [0]:
df.sort("count").show(5)  


+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [0]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

#First, sort by count (ascending by default).
#Then, sort by DEST_COUNTRY_NAME (if two rows have the same count value).


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
# If you want to sort in descending order, use the desc() function
df.sort(desc("count")).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [0]:
df.orderBy(desc("count")).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [0]:
# sorting multiple coloumns
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(5)  

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



#### Handling NULL Values in Sorting
If your DataFrame contains NULL values, you can control where they appear in sorting:

- **asc_nulls_first** → NULLs come first when sorting in ascending order.
- **asc_nulls_last** → NULLs come last when sorting in ascending order.
- **desc_nulls_first** → NULLs come first when sorting in descending order.
- **desc_nulls_last** → NULLs come last when sorting in descending order.

In [0]:
df.orderBy(col("count").desc_nulls_last()).show(5)

#This ensures NULL values appear at the bottom of the results


+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [0]:
df.sortWithinPartitions("count").show(20)


+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Croatia|    1|
|       United States|          Singapore|    1|
|             Moldova|      United States|    1|
|               Malta|      United States|    1|
|       United States|          Gibraltar|    1|
|Saint Vincent and...|      United States|    1|
|            Suriname|      United States|    1|
|       United States|             Cyprus|    1|
|        Burkina Faso|      United States|    1|
|            Djibouti|      United States|    1|
|       United States|            Estonia|    1|
|              Zambia|      United States|    1|
|              Cyprus|      United States|    1|
|       United States|          Lithuania|    1|
|       United States|           Bulgaria|    1|
|       United States|            Georgia|    1|
|       United States|            Bahrain|    1|
|       Cote d'Ivoir

- sort() / orderBy() → Global sorting - Sorts the entire DataFrame across all partitions. Requires a full shuffle of data, which is costly. Performance hit due to shuffling 
- sortWithinPartitions() → Sorting within each partition- Only sorts data within individual partitions without reshuffling between partitions.Faster than global sorting. No full shuffle, making it more efficient for operations that don’t require full sorting


In [0]:

# Sample Data
data = [
    ("USA", "India", 10),
    ("France", "Germany", 5),
    ("India", "USA", 8),
    ("USA", "Canada", 12),
    ("Germany", "France", 6),
    ("Canada", "USA", 15)
]

columns = ["DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME", "count"]

# Create DataFrame
df_data = spark.createDataFrame(data, columns)

# Repartition Data by count # Splits data into 3 partitions, ensuring that all rows with the same "ORIGIN_COUNTRY_NAME" are in the same partition.
df_part = df_data.repartition(3, "ORIGIN_COUNTRY_NAME")


#Sorts data within each partition in ascending order (by default) based on the "count" column
df_part = df_part.sortWithinPartitions("count")

df_part.show()


+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            India|                USA|    8|
|              USA|              India|   10|
|           Canada|                USA|   15|
|           France|            Germany|    5|
|          Germany|             France|    6|
|              USA|             Canada|   12|
+-----------------+-------------------+-----+



#### Limiting Rows (limit())
When working with large datasets, you may want to preview or select only the top N rows


In [0]:
# Selecting the Top 5 Rows
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
#Selecting the Top 6 Rows in Descending Order
df.sort(col('count').desc()).limit(6).show()

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
|   United Kingdom|      United States|  2025|
+-----------------+-------------------+------+



### Repartition and Coalesce in PySpark

When working with big data in Spark, it's important to control how the data is distributed across different partitions. Partitions are chunks of data that Spark processes in parallel. Spark provides two key functions for managing partitions:

- repartition() – Increases or decreases the number of partitions by performing a full shuffle.
- coalesce() – Decreases the number of partitions without a full shuffle, making it more efficient in some cases.


In [0]:
#Checking the Number of Partitions currently- 
df.rdd.getNumPartitions()


Out[46]: 5

In [0]:
# Changing the Number of Partitions- This will create 8 partitions and shuffle the data
df = df.repartition(8)


In [0]:

# Repartition Based on a Column- If you frequently filter by a specific column, repartitioning by that column can be useful
df = df.repartition(5, col("ORIGIN_COUNTRY_NAME"))
df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Ireland|  344|
|    United States|          Singapore|    1|
|    United States|   Marshall Islands|   39|
|    United States|             Angola|   13|
|    United States|           Portugal|  134|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
df.rdd.getNumPartitions()


Out[43]: 5

coalesce() – Reduce Partitions Without Full Shuffle . It combines existing partitions instead of redistributing all data
- If you need to reduce partitions, use coalesce(), as it is cheaper than repartition().
- If you need even distribution of data, use repartition() (but it causes a full shuffle).




In [0]:
df = df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

#First, the data is repartitioned into 5 partitions based on DEST_COUNTRY_NAME.
#Then, it is merged into 2 partitions without shuffling all data


Collecting Data to the Driver- Spark runs on a cluster, meaning data is distributed across multiple machines.
Sometimes, you need to bring data to the driver machine for further processing

Different Methods to Collect Data to the Driver
- collect() – Retrieves all rows from the DataFrame to the driver.
- take(n) – Fetches the first n rows.
- show(n, truncate) – Displays n rows in a readable format.
- toLocalIterator() – Collects partitions one by one instead of all at once.


In [0]:
# collect() – Fetch All Data
df.collect()
#Retrieves all rows to the driver.
#WARNING: If the dataset is too large, this can crash the driver.


Out[55]: [Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Bolivia', ORIGIN_COUNTRY_NAME='United States', count=30),
 Row(DEST_COUNTRY_NAME='Algeria', ORIGIN_COUNTRY_NAME='United States', count=4),
 Row(DEST_COUNTRY_NAME='Turks and Caicos Islands', ORIGIN_COUNTRY_NAME='United States', count=230),
 Row(DEST_COUNTRY_NAME='Pakistan', ORIGIN_COUNTRY_NAME='United States', count=12),
 Row(DEST_COUNTRY_NAME='Marshall Islands', ORIGIN_COUNTRY_NAME='United States', count=42),
 Row(DEST_COUNTRY_NAME='Suriname', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Panama', ORIGIN_COUNTRY_NAME='United States', count=510),
 Row(DEST_COUNTRY_NAME='New Zealand', ORIGIN_COUNTRY_NAME='United States', count=111),
 Row(DEST_COUNTRY_NAME='Liberia', ORIGIN_COUNTRY_NAME='United States', count=2),
 Row(DEST_COUNTRY_NAME='Ireland', ORIGIN_COUNTRY_NAME='United States', count=335),
 Row(DEST_COUNTRY_NAME='Zambia', ORIGIN_COUNTRY_NAME='Unit

In [0]:
#take(n) – Fetch First N Rows
df.take(5)
#Fetches only the first 5 rows.
#Unlike collect(), this does not retrieve the entire DataFrame

Out[56]: [Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Bolivia', ORIGIN_COUNTRY_NAME='United States', count=30),
 Row(DEST_COUNTRY_NAME='Algeria', ORIGIN_COUNTRY_NAME='United States', count=4),
 Row(DEST_COUNTRY_NAME='Turks and Caicos Islands', ORIGIN_COUNTRY_NAME='United States', count=230),
 Row(DEST_COUNTRY_NAME='Pakistan', ORIGIN_COUNTRY_NAME='United States', count=12)]

In [0]:
# show(n, truncate) – Pretty Print Data
df.show(5, False)
#Displays the first 5 rows.
#False ensures that long text values are not truncated.
#This fetches partitions one by one instead of all at once.
#Useful for very large datasets because it avoids crashing the driver


+------------------------+-------------------+-----+
|DEST_COUNTRY_NAME       |ORIGIN_COUNTRY_NAME|count|
+------------------------+-------------------+-----+
|Moldova                 |United States      |1    |
|Bolivia                 |United States      |30   |
|Algeria                 |United States      |4    |
|Turks and Caicos Islands|United States      |230  |
|Pakistan                |United States      |12   |
+------------------------+-------------------+-----+
only showing top 5 rows



In [0]:
# toLocalIterator() – Fetch Data Partition-by-Partition
for row in df.toLocalIterator():
    print(row)

#This fetches partitions one by one instead of all at once.
#Useful for very large datasets because it avoids crashing the driver better than collect but of partitions are very large it can crash the driver.


Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)
Row(DEST_COUNTRY_NAME='Bolivia', ORIGIN_COUNTRY_NAME='United States', count=30)
Row(DEST_COUNTRY_NAME='Algeria', ORIGIN_COUNTRY_NAME='United States', count=4)
Row(DEST_COUNTRY_NAME='Turks and Caicos Islands', ORIGIN_COUNTRY_NAME='United States', count=230)
Row(DEST_COUNTRY_NAME='Pakistan', ORIGIN_COUNTRY_NAME='United States', count=12)
Row(DEST_COUNTRY_NAME='Marshall Islands', ORIGIN_COUNTRY_NAME='United States', count=42)
Row(DEST_COUNTRY_NAME='Suriname', ORIGIN_COUNTRY_NAME='United States', count=1)
Row(DEST_COUNTRY_NAME='Panama', ORIGIN_COUNTRY_NAME='United States', count=510)
Row(DEST_COUNTRY_NAME='New Zealand', ORIGIN_COUNTRY_NAME='United States', count=111)
Row(DEST_COUNTRY_NAME='Liberia', ORIGIN_COUNTRY_NAME='United States', count=2)
Row(DEST_COUNTRY_NAME='Ireland', ORIGIN_COUNTRY_NAME='United States', count=335)
Row(DEST_COUNTRY_NAME='Zambia', ORIGIN_COUNTRY_NAME='United States', count=1)
Row(DEST_CO