In [0]:
# Basic Aritmetic and Math Functions

df = spark.read.table("population_metrics.default.countries_population")

display(df)

In [0]:
from pyspark.sql.functions import col, round, greatest, least

In [0]:
df = df.withColumn("population_forecast_2030", round(col("population") * 1.2,1))
df.display()

In [0]:
df = df.withColumn(
    "population_density", round(col("population") / col("area_km2"), 1)
)

df.display()

In [0]:
df.select(
    "name",
    "population",
    "population_forecast_2030",
    least("population", "population_forecast_2030").alias("lower_of_pop_and_forecast_pop"),
    greatest("population", "population_forecast_2030").alias("higher_of_pop_and_forecast_pop")
).display()

In [0]:
from pyspark.sql.functions import length, char_length, upper, lower, lit, concat, concat_ws

In [0]:
# String functions examples
df.select(
    "name",
    length("name").alias("count_of_chars"), # or char_length
    upper("name").alias("upper"),
    lower("name").alias("lower"),
).display()

In [0]:
df.select(
    "name",
    "capital",
    lit("hello3")
).display()

In [0]:
df.select(
    "name",
    "capital",
    concat("capital", lit(", "),"name").alias("concat city country")
).display()

In [0]:
df.select(
    "name",
    "capital",
    concat("capital", lit(", "),"name").alias("concat city country"),
    concat_ws(", ", "capital", "name", "name").alias("concat ws city country")
).display()

In [0]:
# Datetime Conversions
schema = "name string, hire_date string"

# minimal DataFrame with name and hire_date
data = [
    ("Alice", "15-05-2020"),
    ("Bob", "25-09-2018"),
    ("Charlie", "05-12-2022"),
]

df = spark.createDataFrame(data, schema)

df.display()

# Standard date yyyy-MM-dd
# Timestamp yyyy-MM-dd[T] HH:mm:ss

In [0]:
from pyspark.sql.functions import to_date, to_timestamp, date_format, curdate, current_timestamp, timestamp_diff

In [0]:
df = df.withColumn("hire_date_converted", to_date("hire_date", "dd-MM-yyyy"))
df.display()

In [0]:
df = df.withColumn("hire_timestamp", to_timestamp("hire_date", "dd-MM-yyyy"))
df.display()

In [0]:
df = df.withColumn("year", date_format("hire_date_converted", "yyyy"))
df.display()

In [0]:
# Date and time functions
# minimal DataFrame with name and hire_date
data = [
    ("Alice", "15-05-2020"),
    ("Bob", "25-09-2018"),
    ("Charlie", "05-12-2022"),
]

df = spark.createDataFrame(data, schema)
df = df.withColumn("hire_date", to_date("hire_date", "dd-MM-yyyy"))

df.display()


In [0]:
df = df.withColumn(
    "current_date", curdate()
).withColumn(
    "current_timestamp", current_timestamp(
))

df.display()

In [0]:
df.withColumn(
    "days_since_hire", timestamp_diff("MONTH", "current_date", "hire_date")
).display()

In [0]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

In [0]:
# Joining Data Frames

# Define schema for df_1 (Sales) 
schema_sales = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("sale_date", StringType(), True),
    StructField("sales_amount", IntegerType(), True)
])

# Data for df_1
data_sales = [
    (1001, 103, "2025-01-15", 5000),
    (1002, 104, "2025-01-16", 7000),
    (1003, 105, "2025-01-17", 6500),
    (1004, 106, "2025-01-18", 4800),
    (1005, 107, "2025-01-19", 5300)
]

# Create df_1 with sales data
df_1 = spark.createDataFrame(data_sales, schema_sales)
df_1.show()

# Define schema for df_2 (Stores)
schema_stores = StructType([
    StructField("id", IntegerType(), True),
    StructField("store_name", StringType(), True),
    StructField("city", StringType(), True)
])

# Data for df_2
data_stores = [
    (101, "Store A", "New York"),
    (102, "Store B", "Los Angeles"),
    (103, "Store C", "Chicago"),
    (104, "Store D", "Houston"),
    (105, "Store E", "Phoenix")
]

# Create df_2 with stores data
df_2 = spark.createDataFrame(data_stores, schema_stores)
df_2.show()


In [0]:
df_1.join(df_2, df_1.store_id == df_2.id, "inner").display()

In [0]:
df_1.join(df_2, df_1.store_id == df_2.id, "left").display()

In [0]:
df_1.join(df_2, df_1.store_id == df_2.id, "right").display()

In [0]:
df_1.join(df_2, df_1.store_id == df_2.id, "fullouter").display()

In [0]:
df_1.join(df_2, df_1.store_id == df_2.id, "left_anti").display()

In [0]:
df_1.crossJoin(df_2).display()

In [0]:
# Joins Exercice

# Perform a join on the country's population, country, regions and country sub-regions in the default schema of Population Metrics catalog; the join should return a single result set which should be stored as a managed delta table called countries_consolidated. It should be stored in the default schema of the Population Metrics catalog.
# countries_population: country_id, name, nationality, country_code, iso_alpha2, capital, population, area_km2, region_id, sub_region_id
# countries_consolidated: country_id, country, region, sub_region, population, area_km2
# country_regions: id, name
# country_sub_regions: id, name

# Step 1. Read countries_population, country_regions, country_sub_regions tables as DataFrames
# Step 2. Join the country's population DataFrame with the country regions dataframe 
# Step 3. Resulting DataFrame should be joined with the country sub-regions DataFrame
# Step 4. The final DataFrame should have only the columns displayed in the countries_consolidated and than store the data from that final data frame into a managed delta table called countries_consolidated

In [0]:
# Store countries_population table in a DataFrame
# countries_consolidated: country_id, country, region, sub_region, population, area_km2
df_countries = spark.read.table("population_metrics.default.countries_population").select("country_id", col("name").alias("country"), "population", "area_km2", "region_id", "sub_region_id")
df_countries.display()

# Store countries_regions table in a DataFrame
df_regions = spark.read.table("population_metrics.default.country_regions").select("id", col("name").alias("region"))
df_regions.display()

# Store country_sub_regions table in a DataFrame
df_sub_regions = spark.read.table("population_metrics.default.country_sub_regions").select("id", col("name").alias("sub_region"))
df_sub_regions.display()


In [0]:
# Join the country population DataFrame with the country regions
# countries_consolidated: country_id, country, region, sub_region, population, area_km2
# Step 4. The final DataFrame should have only the columns displayed in the countries_consolidated and than store the data from that final data frame into a managed delta table called countries_consolidated

df_int = df_countries.join(df_regions, df_countries.region_id == df_regions.id, "left").select("country_id", "country", "region", "population", "area_km2", "sub_region_id")

df_int.display()


In [0]:

df_final = df_int.join(df_sub_regions, df_int.sub_region_id == df_sub_regions.id, "left").select("country_id", "country", "region", "sub_region", "population", "area_km2") # we want all the records from the left
df_final.display()

In [0]:
df_step_3.write.saveAsTable("population_metrics.default.countries_consolidated")

In [0]:
# Unioning DataFrames => inputs with the same number of columns
df_1.union(df_1).display()

In [0]:
df_1.union(df_2.select("id", "store_name", "city", "city")).display()

In [0]:
df_1.unionByName(df_2, allowMissingColumns=True).display()

In [0]:
# Filtering and Sorting

df = spark.read.table("population_metrics.default.countries_consolidated")
df.display()

In [0]:
df.where(df.region == "Asia").display()

In [0]:
df.filter('region = "Asia"').display()

In [0]:
df.filter((df.region == 'Asia') | (df.region == 'Europe')).display()

In [0]:
df.filter("region = 'Asia' or region = 'Europe'").display()

In [0]:
df.filter((df.region == 'Asia') & (df.country == 'India')).display()

In [0]:
df.filter("region = 'Asia' and country = 'India'").display()

In [0]:
df.where((df.population / df.area_km2) > 1000).display()

In [0]:
df.where("(population/area_km2) > 1000").display()

In [0]:
# Remove duplicates
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# define schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True)
])

# sample data
data = [
    (1, "Alice", "HR"),
    (2, "Bob", "IT"),
    (3, "Charlie", "Finance"),
    (4, "Alice", "HR"),
    (5, "David", "HR"),
    (6, "Alice", "HR")
]

df = spark.createDataFrame(data, schema)
df.display()

In [0]:
df.dropDuplicates().display()

In [0]:
df.dropDuplicates(["name"]).display()

In [0]:
# Sorting and Limiting

df = spark.read.table("population_metrics.default.countries_population")
df.display()

In [0]:
df.sort("population", ascending=False).limit(10).display()

In [0]:
df.sort("region_id", "population", ascending=False).display()

In [0]:
df.sort(df.population.asc(), df.region_id.desc()).display()

In [0]:
df.orderBy(df.population.asc(), df.region_id.desc()).display() # the same as sort

In [0]:
# Grouping

df = spark.read.table("population_metrics.default.countries_consolidated")
df.display()

In [0]:
type(df.groupBy("region"))

In [0]:
from pyspark.sql.functions import sum, avg

In [0]:
df.groupBy("region")

In [0]:
df.groupBy("region").sum("population").avg("population").display()

In [0]:
df.groupBy("region", "sub_region").agg(
    sum("population").alias("total_population"),
    avg("population").alias("avg_population")
).display()

In [0]:
# Pivoting and Upivoting
# The pivot operation in spark lets us turn the unique values of one column into multiple new columns, aggregating the data in the process

df_pivot = df.groupBy("sub_region").pivot("region", ["America", 'Europe']).sum("population")
df_pivot.display()

In [0]:
df_pivot.unpivot("sub_region", ["America", "Europe"], "region", "population").display()

In [0]:
# Conditional Expressions

df = spark.read.table("population_metrics.default.countries_consolidated")
df.display()

In [0]:
from pyspark.sql.functions import expr, when

In [0]:
sql_string = """
case 
   when population > 300000000 then 'High' 
   when population > 20000 then 'Medium' 
   else 'Small' end
"""

In [0]:
df.withColumn("population_class", expr(sql_string)).display()

In [0]:
df.select("country",
          "region",
          "sub_region",
          "population",
          "area_km2",
          expr(sql_string).alias("population_class")
          ).display()

In [0]:
df.withColumn("population_class", when(df.population > 300000000, "High").otherwise(None))