In [0]:
print("hello")

hello


## This is a practice notebook
### Creating dataframes using data and files

In [0]:
#Define the schema

from pyspark.sql.types import StringType,StructField,StructType,IntegerType

schema = StructType([
    StructField("Name",StringType(),True),
    StructField("Age",StringType(),True),
    StructField("City",StringType(),True),
                     ])
#Create a list of sample data
data = [
    ("Alice",30,"New York"),
    ("Bob", 25, "San Francisco"),
    ("Catherine", 27, "Chicago"),
    ("David", 35, "Boston"),
    ("Eva", 28, "Seattle")
]

# Create DataFrame using the data
df = spark.createDataFrame(data, schema)

# show data
df.show()

+---------+---+-------------+
|     Name|Age|         City|
+---------+---+-------------+
|    Alice| 30|     New York|
|      Bob| 25|San Francisco|
|Catherine| 27|      Chicago|
|    David| 35|       Boston|
|      Eva| 28|      Seattle|
+---------+---+-------------+



## Reading data from a csv file in a dataframe

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType

schema = StructType([
    StructField("Name",StringType(),True),
    StructField("Age",IntegerType(),True),
    StructField("City",StringType(),True)
])

#file path dbfs:/FileStore/shared_uploads/rajadityagaur@gmail.com/sample_data.csv
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/rajadityagaur@gmail.com/sample_data.csv")

#show df
df1.show()

+---------+---+-------------+
|     Name|Age|         City|
+---------+---+-------------+
|    Alice| 30|     New York|
|      Bob| 25|San Francisco|
|Catherine| 27|      Chicago|
|    David| 35|       Boston|
|      Eva| 28|      Seattle|
+---------+---+-------------+



## Adding extra data to the dataframe for analysis

In [0]:
#additional datta
additional_data = [
    ("Frank", 29, "Los Angeles"),
    ("Grace", 31, "Houston")
]
#creating df for this new data
additional_df = spark.createDataFrame(additional_data,schema)

In [0]:
additional_df.show()

+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|Frank| 29|Los Angeles|
|Grace| 31|    Houston|
+-----+---+-----------+



### adding both dataframes

In [0]:
combined_df = df.union(additional_df)
combined_df.show()

+---------+---+-------------+
|     Name|Age|         City|
+---------+---+-------------+
|    Alice| 30|     New York|
|      Bob| 25|San Francisco|
|Catherine| 27|      Chicago|
|    David| 35|       Boston|
|      Eva| 28|      Seattle|
|    Frank| 29|  Los Angeles|
|    Grace| 31|      Houston|
+---------+---+-------------+



In [0]:
# Define additional data including duplicates and null values
additional_data = [
    ("Frank", 29, "Los Angeles"),  # Normal data
    ("Grace", 31, "Houston"),      # Normal data
    ("Alice", 30, "New York"),     # Duplicate of existing row
    (None, 22, "Phoenix"),         # Null value in Name
    ("Hank", None, "Denver"),      # Null value in Age
    (990, 25, 0)              # Null value in City
]

#this will work since "STRING-TYPE ACCEPTS NUMBERS BUT INTEGER-TYPE DOES NOT ACCEPT STRING."
test_df = spark.createDataFrame(additional_data,schema)

test_df.show()

+-----+----+-----------+
| Name| Age|       City|
+-----+----+-----------+
|Frank|  29|Los Angeles|
|Grace|  31|    Houston|
|Alice|  30|   New York|
| null|  22|    Phoenix|
| Hank|null|     Denver|
|  990|  25|          0|
+-----+----+-----------+



## Handling null values and duplicates values and reading data in various formats

In [0]:
from pyspark.sql.types import StructField,StructType,IntegerType,StringType
# Sample data
data = [
    ("Alice", 30, "New York"),
    ("Bob", 25, "San Francisco"),
    ("Catherine", 27, "Chicago"),
    ("David", 35, "Boston"),
    ("Eva", 28, "Seattle"),
    ("Frank", 29, "Los Angeles"),  # Normal data
    ("Grace", 31, "Houston"),      # Normal data
    ("Alice", 30, "New York"),     # Duplicate of existing row
    (None, 22, "Phoenix"),         # Null value in Name
    ("Hank", None, "Denver"),      # Null value in Age
    ("Ivy", 25, None)              # Null value in City
]
schema = StructType([
    StructField("Name",StringType(),True),
    StructField("Age",IntegerType(),True),
    StructField("City",StringType(),True)
])

df = spark.createDataFrame(data,schema)
#print original Df
df.show()

# Handling null values - deleting all rows having any null value
print("Dataframe after dropping rows with any null values: ")
df_no_nulls = df.dropna()
df_no_nulls.show()

# Filling null values
df_filled = df.fillna({
    "Name":"Unknown",
    "Age":0,
    "City":"Unknown"
})
df_filled.show()

#Dropping rows where specific column is null
print("DataFrame after dropping rows with null values in specific columns:")
df_no_nulls_subset = df.dropna(subset = ["Name","City"])
df_no_nulls_subset.show()

# Handling duplicates - removing duplicate rows
print("DataFrame after removing duplicate rows:")
df_no_duplicates = df.dropDuplicates()
df_no_duplicates.show()

print("DataFrame after removing duplicate rows based on specific columns:")
df_no_duplicates_subset = df.dropDuplicates(subset = ["Name","City"])

df_no_duplicates_subset.show()

+---------+----+-------------+
|     Name| Age|         City|
+---------+----+-------------+
|    Alice|  30|     New York|
|      Bob|  25|San Francisco|
|Catherine|  27|      Chicago|
|    David|  35|       Boston|
|      Eva|  28|      Seattle|
|    Frank|  29|  Los Angeles|
|    Grace|  31|      Houston|
|    Alice|  30|     New York|
|     null|  22|      Phoenix|
|     Hank|null|       Denver|
|      Ivy|  25|         null|
+---------+----+-------------+

Dataframe after dropping rows with any null values: 
+---------+---+-------------+
|     Name|Age|         City|
+---------+---+-------------+
|    Alice| 30|     New York|
|      Bob| 25|San Francisco|
|Catherine| 27|      Chicago|
|    David| 35|       Boston|
|      Eva| 28|      Seattle|
|    Frank| 29|  Los Angeles|
|    Grace| 31|      Houston|
|    Alice| 30|     New York|
+---------+---+-------------+

+---------+---+-------------+
|     Name|Age|         City|
+---------+---+-------------+
|    Alice| 30|     New York|


## Filtering data

In [0]:
df_filtered = df.filter(df['Age'] > 25)
df_filtered.show()

#sql like syntax
df_filtered = df.filter("Age > 25")
df_filtered.show()


+---------+---+-----------+
|     Name|Age|       City|
+---------+---+-----------+
|    Alice| 30|   New York|
|Catherine| 27|    Chicago|
|    David| 35|     Boston|
|      Eva| 28|    Seattle|
|    Frank| 29|Los Angeles|
|    Grace| 31|    Houston|
|    Alice| 30|   New York|
+---------+---+-----------+

+---------+---+-----------+
|     Name|Age|       City|
+---------+---+-----------+
|    Alice| 30|   New York|
|Catherine| 27|    Chicago|
|    David| 35|     Boston|
|      Eva| 28|    Seattle|
|    Frank| 29|Los Angeles|
|    Grace| 31|    Houston|
|    Alice| 30|   New York|
+---------+---+-----------+



## sorting data by single and multiple column




In [0]:
#sorting by a single column
df_sorted_single_col = df.sort("Age")
df_sorted_single_col.show()

#Multiple columns
df_sorted_multiple_col = df.sort(df["Age"].asc(),df["Name"].desc())
df_sorted_multiple_col.show()


+---------+----+-------------+
|     Name| Age|         City|
+---------+----+-------------+
|     Hank|null|       Denver|
|     null|  22|      Phoenix|
|      Bob|  25|San Francisco|
|      Ivy|  25|         null|
|Catherine|  27|      Chicago|
|      Eva|  28|      Seattle|
|    Frank|  29|  Los Angeles|
|    Alice|  30|     New York|
|    Alice|  30|     New York|
|    Grace|  31|      Houston|
|    David|  35|       Boston|
+---------+----+-------------+

+---------+----+-------------+
|     Name| Age|         City|
+---------+----+-------------+
|     Hank|null|       Denver|
|     null|  22|      Phoenix|
|      Ivy|  25|         null|
|      Bob|  25|San Francisco|
|Catherine|  27|      Chicago|
|      Eva|  28|      Seattle|
|    Frank|  29|  Los Angeles|
|    Alice|  30|     New York|
|    Alice|  30|     New York|
|    Grace|  31|      Houston|
|    David|  35|       Boston|
+---------+----+-------------+



## Aggregations

In [0]:
from pyspark.sql.functions import min,max,avg,sum

df_agg = df.groupBy("City").agg(
    avg("Age").alias("AverageAge"),
    max("Age").alias("MaxAge"),
    min("Age").alias("MinAge"),
    sum("Age").alias("TotalAge")
)

df_agg.show()

+-------------+----------+------+------+--------+
|         City|AverageAge|MaxAge|MinAge|TotalAge|
+-------------+----------+------+------+--------+
|     New York|      30.0|    30|    30|      60|
|San Francisco|      25.0|    25|    25|      25|
|      Chicago|      27.0|    27|    27|      27|
|       Boston|      35.0|    35|    35|      35|
|      Seattle|      28.0|    28|    28|      28|
|  Los Angeles|      29.0|    29|    29|      29|
|      Houston|      31.0|    31|    31|      31|
|      Phoenix|      22.0|    22|    22|      22|
|         null|      25.0|    25|    25|      25|
|       Denver|      null|  null|  null|    null|
+-------------+----------+------+------+--------+



## Window Functions

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

window_spec = Window.partitionBy("City").orderBy("Age")

df_window = df.withColumn("row_number", row_number().over(window_spec))
df_window = df_window.withColumn("rank", rank().over(window_spec))
df_window = df_window.withColumn("dense_rank", dense_rank().over(window_spec))

df_window.show()


+---------+----+-------------+----------+----+----------+
|     Name| Age|         City|row_number|rank|dense_rank|
+---------+----+-------------+----------+----+----------+
|      Ivy|  25|         null|         1|   1|         1|
|    David|  35|       Boston|         1|   1|         1|
|Catherine|  27|      Chicago|         1|   1|         1|
|     Hank|null|       Denver|         1|   1|         1|
|    Grace|  31|      Houston|         1|   1|         1|
|    Frank|  29|  Los Angeles|         1|   1|         1|
|    Alice|  30|     New York|         1|   1|         1|
|    Alice|  30|     New York|         2|   1|         1|
|     null|  22|      Phoenix|         1|   1|         1|
|      Bob|  25|San Francisco|         1|   1|         1|
|      Eva|  28|      Seattle|         1|   1|         1|
+---------+----+-------------+----------+----+----------+



## Working with Time Formats

In [0]:
#Date and Timestamp Conversion:


## Extracting Parts of Date:

##Joins

### GroupBy and Aggregations

In [0]:
df_grouped = df.groupBy("City").agg(
    sum("Age").alias("TotalAge"),
    avg("Age").alias("AverageAge")
)
df_grouped.show()


+-------------+--------+----------+
|         City|TotalAge|AverageAge|
+-------------+--------+----------+
|     New York|      60|      30.0|
|San Francisco|      25|      25.0|
|      Chicago|      27|      27.0|
|       Boston|      35|      35.0|
|      Seattle|      28|      28.0|
|  Los Angeles|      29|      29.0|
|      Houston|      31|      31.0|
|      Phoenix|      22|      22.0|
|         null|      25|      25.0|
|       Denver|    null|      null|
+-------------+--------+----------+



## UDFs (User-Defined Functions)

In [0]:
# Example below

[0;31m---------------------------------------------------------------------------[0m
[0;31mPythonException[0m                           Traceback (most recent call last)
File [0;32m<command-1721858934126211>:9[0m
[1;32m      7[0m add_one_udf [38;5;241m=[39m udf(add_one, IntegerType())
[1;32m      8[0m df_udf [38;5;241m=[39m df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mAgePlusOne[39m[38;5;124m"[39m, add_one_udf(df[[38;5;124m"[39m[38;5;124mAge[39m[38;5;124m"[39m]))
[0;32m----> 9[0m df_udf[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43m*[39;49m[38;5;241;43m*[39;49

## Read and Write CSV

In [0]:
#Reading
# df = spark.read.format("csv").option("header", "true").load("/path/to/file.csv")
# df = spark.read.format("json").load("/path/to/file.json")
# df = spark.read.format("parquet").load("/path/to/file.parquet")

#writing
# df.write.format("csv").option("header", "true").save("/path/to/save")
# df.write.format("json").save("/path/to/save")
# df.write.format("parquet").save("/path/to/save")


## Running sql queries

In [0]:
df.createOrReplaceTempView("people")
sql_df = spark.sql("SELECT * FROM people WHERE AGE > 25")
sql_df.show()

+---------+---+-----------+
|     Name|Age|       City|
+---------+---+-----------+
|    Alice| 30|   New York|
|Catherine| 27|    Chicago|
|    David| 35|     Boston|
|      Eva| 28|    Seattle|
|    Frank| 29|Los Angeles|
|    Grace| 31|    Houston|
|    Alice| 30|   New York|
+---------+---+-----------+

