In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Initialize a SparkSession
spark = SparkSession.builder.config("spark.jars", "postgresql-42.6.0.jar").master('local[4]').getOrCreate()

# slide 13

In [None]:
# Some data with their column names. With DF we can structure our data
columns = ["id","name","surname","age","country","local_phone"]
input_data = [(1,"Simón","Bolivar",47,"VEN","489 895 965"),
    (2,"Fidel","Castro",90,"CU","956 268 348"),
    (3,"Jose","Doroteo",45,"MEX","985 621 444"),
    (4,"Ernesto","Guevara",39,"AR","895 325 481"),
    (5,"Hugo","Chávez",58,"VE","489 895 965"),
    (6,"Camilo","Cienfuegos",27,"CUB","956 268 348"),
    (7,"Emiliano","Zapata",39,"ME","985 621 444"),
    (8,"Juan Domingo","Perón",78,"ARG","985 621 444"),
  ]

# Simplier data
int_list = [1,2,3]


# intDF = spark.createDataFrame(int_list).toDF("value") # this doesnt work
# DF from primitive indicating type
intDF = spark.createDataFrame(int_list, "int").toDF("value")
intDF.printSchema()
intDF.show()

complexDF = spark.createDataFrame(input_data)
complexDF.printSchema()
complexDF.show()

# slide 14

In [None]:
# DF from RDD

# Access the SparkContext object from the SparkSession object
sc = spark.sparkContext
# Create the DF from the RDD
rdd = sc.parallelize(input_data)
df = rdd.toDF()
df.printSchema()

# slide 14

In [None]:
# You can pass the column names to the DF

# Access the SparkContext object from the SparkSession object
sc = spark.sparkContext

# Create the DF from the RDD with column names
rdd = sc.parallelize(input_data)

columns = ["id","name","surname","age","country","local_phone"]
df = rdd.toDF(columns)
df.printSchema()

# slide 14

In [None]:
# Another method using RDD as input is the createDataFrame method from the
# SparkSession object


rdd = sc.parallelize(input_data)

columns = ["id","name","surname","age","country","local_phone"]
df = spark.createDataFrame(rdd).toDF(*columns)
df.printSchema()
# The toDF method here is different because you are applying it to a DF not to an RDD,
# check specifications on the documentation

# slide 14

In [None]:
# Using the Row type

rowData = map(lambda x: Row(*x), input_data) 
df = spark.createDataFrame(rowData,columns)
df.printSchema()

# slide 14

In [None]:
# Creating your schema using python pyspark.sql structures, most consistent approach
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

schema = StructType([ \
    StructField("id", IntegerType(), True), \
    StructField("name",StringType(),True), \
    StructField("surname",StringType(),True), \
    StructField("age", StringType(), True), \
    StructField("country",StringType(),True), \
    StructField("local_phone", StringType(), True), \
  ])

df = spark.createDataFrame(data=input_data,schema=schema)
df.printSchema()
df.show()

# slide 14

In [None]:
# Writing CSV file

output_path1 = "../data/csv_example/many_files"
output_path2 = "../data/csv_example/one_file"

# Write the DataFrame to CSV
df.write.mode('overwrite').csv(output_path1, header=True)
df.coalesce(1).write.mode('overwrite').csv(output_path2, header=True)

# slide 14

In [None]:
# Reading CSV file

read_path = '../data/csv_example/one_file'

print('+ Read csv from file: \n')
df_from_csv = spark.read.csv(read_path)
df_from_csv.printSchema()
df_from_csv.show()

print('+ Read csv with header from file: \n')
df_from_csv = spark.read.csv(read_path,header=True)
df_from_csv.printSchema()
df_from_csv.show()

print('+ Read csv with header and infering data types from file: \n')
df_from_csv = spark.read.csv(read_path,header=True, inferSchema=True)
df_from_csv.printSchema()
df_from_csv.show()

# slide 14

In [None]:
# Saving JSON file
output_path1 = "./data/json_example/many_files"
output_path2 = "./data/json_example/one_file"

# Write the DataFrame to CSV
df.write.mode('overwrite').json(output_path1)
df.coalesce(1).write.mode('overwrite').option("multiline","true").json(output_path2)

# slide 14

In [None]:
# Read data from json

read_path = "./data/json_example/many_files"

print('+ Read json from file: \n')
df_from_csv = spark.read.json(read_path)
df_from_csv.printSchema()
df_from_csv.show()

# slide 14

In [None]:
# Write in DB
# You need to have a created DB on a local/remote postgresql server

# Define database connection properties
db_properties = {
    "url": "jdbc:postgresql://localhost:5432/pyspark_db",
    "driver": "org.postgresql.Driver",
    "user": "pyspark",
    "password": "password",
}

table_name = "nice_guys"

# Write the DataFrame to the database
df.write \
    .jdbc(url= db_properties["url"],
          table=table_name,
          mode="overwrite",  # You can use "append" or "ignore" as well
          properties=db_properties)

# slide 14

In [None]:
# Read DF from databse

# Define database connection properties
db_properties = {
    "url": "jdbc:postgresql://localhost:5432/pyspark_db",
    "driver": "org.postgresql.Driver",
    "user": "pyspark",
    "password": "password",
}

table_name = "nice_guys"

df_from_postgresql = spark.read \
    .jdbc(url=db_properties["url"],
          table="nice_guys",
          properties=db_properties)

df_from_postgresql.show()
df = df_from_postgresql

# slide 14

In [None]:
# Load table with HIVE format to spark catalog to use it as view

warehouse_location = spark.conf.get("spark.sql.warehouse.dir")
table_name = "example_table"

df.write.mode('overwrite').parquet('./data/parquet_example')

# Load table
df = spark.read.format('parquet').load('./data/parquet_example')
df.show()

# slide 16

In [None]:
# Add table to spark catalog

access_table_name = 'my_table'
df.createOrReplaceTempView(access_table_name)

print('Available tables:')
tables = spark.catalog.listTables()
for table in tables:
    print('+ ',table.name)

# slide 16

In [None]:
# Read some_table if its loaded in the spark.catalog

df = spark.read.table(access_table_name)
df.show()

# slide 16

In [None]:
# Use SQL on DF

access_table_name = 'my_table'
result = spark.sql(f"SELECT * FROM {access_table_name} WHERE Age >= 45")
result.show()

# slide 16

In [None]:
# Filter

from pyspark.sql.functions import col

df.filter(col('age') < 45).show()

# slide 17

In [None]:
# Order by

df.orderBy('id').show()

df.orderBy('age').show()

# slide 17

In [None]:
# Drop

df.drop('name').show()

# slide 17

In [None]:
# Add column to DF

from pyspark.sql.functions import col

new_df = df.withColumn('young', col('age') > 30)
new_df.show()

# SQL equivalent...
access_table_name = 'my_table'
spark.sql(f"SELECT *,(Age > 30) AS young FROM {access_table_name}").show()

# slide 17

In [None]:
columns = ["id","name","surname","age","country","local_phone","phone_code"]
input_data = [(1,"Simón","Bolivar",47,"VEN","489 895 965","+58"),
    (2,"Fidel","Castro",90,"CU","956 268 348","+53"),
    (3,"Jose","Doroteo",45,"MEX","985 621 444","+52"),
    (4,"Ernesto","Guevara",39,"AR","895 325 481","+54"),
    (5,"Hugo","Chávez",58,"VE","489 895 965","+58"),
    (6,"Camilo","Cienfuegos",27,"CUB","956 268 348","+53"),
    (7,"Emiliano","Zapata",39,"ME","985 621 444","+52"),
    (8,"Juan Domingo","Perón",78,"ARG","985 621 444","+54"),
  ]

mod_df = spark.createDataFrame(input_data).toDF(*columns)
mod_df.show()

# slide 21

In [None]:
# Grouping and Aggregation

from pyspark.sql.functions import count

result = mod_df.groupBy("phone_code") \
                .agg(count("*").alias("item_count"))
result.show()

# slide 21

In [None]:
# Grouping and Aggregation 2 columns

from pyspark.sql.functions import collect_list, count

result = mod_df.groupBy("phone_code") \
                .agg(count("*").alias("item_count"), \
                     collect_list("id").alias("ids"))
result.show()

# slide 21

In [None]:
# Average

from pyspark.sql.functions import avg

result = mod_df.groupBy("phone_code") \
                .agg(avg('age').alias("age_avg"))

result.show()

# slide 21

In [None]:
# Collect list, collect set

from pyspark.sql.functions import collect_list, collect_set, col

df = mod_df.withColumn('young', col('age') > 30)
df.show()


result = df.groupBy("young") \
                .agg(
                      collect_list("phone_code").alias("phone_codes"))
result.show(truncate=False)

result = df.groupBy("young") \
                .agg(
                      collect_set("phone_code").alias("phone_codes"))
result.show(truncate=False)


# slide 21

In [None]:
# Explode example

from pyspark.sql.functions import explode, collect_list, count

result = mod_df.groupBy("phone_code") \
                .agg(count("*").alias("item_count"), \
                     collect_list("name").alias("names"))
result.show()

result = result.select('phone_code', explode('names').alias('name'))

result.show()


# slide 21

In [None]:
# Timestamps

from pyspark.sql.functions import to_timestamp

df=spark.createDataFrame(
        data = [ ("1","2023-09-23 05:23:02.013")],
        schema=["id","event"])
df.printSchema()

#Timestamp String to DateType
df.withColumn("timestamp",to_timestamp("event")) \
  .show()

# format 'yyyy-MM-dd  HH:mm:ss.SSS'

# Custom format
df=spark.createDataFrame(
        data = [ ("1","23-09-2023 05:23:02.013")],
        schema=["id","event"])
df.printSchema()

#Timestamp String to DateType
df.withColumn("timestamp",to_timestamp("event",'dd-MM-yyyy HH:mm:ss.SSSS')) \
  .show()

# Custom format, be careful things can go wrong
df=spark.createDataFrame(
        data = [ ("1","23-09-2023 05:23:02.013")],
        schema=["id","event"])
df.printSchema()

#Timestamp String to DateType
df.withColumn("timestamp",to_timestamp("event",'MM-dd-yyyy HH:mm:ss.SSSS')) \
  .show()

# slide 21

In [None]:
# Time differences

from pyspark.sql.functions import current_date, datediff, col

data = [("1","2023-09-28"),("2","2001-09-11"),("3","1989-11-09")]
df=spark.createDataFrame(data=data,schema=["id","date"])

df.select(
      col("date"),
      current_date().alias("current_date"),
      datediff(current_date(),col("date")).alias("datediff")
    ).show()

#slide 21

In [None]:
# Sorting

from pyspark.sql.functions import asc_nulls_first

result = mod_df.orderBy(mod_df.age.asc_nulls_first())
result.show()
# You will get null values first

#slide 21

In [None]:
# Using regular expersions

from pyspark.sql.functions import regexp_extract, regexp_replace, concat_ws, upper

prefix_pattern = r'(\d+)'
# Use regexp_extract to extract the prefix from the "local_phone" column
df1 = mod_df.withColumn("phone_code", regexp_extract(col("phone_code"), prefix_pattern, 1))
df1.show()

df2 = mod_df.withColumn("local_phone", regexp_replace(col("local_phone"), r'\s+', ''))
df2.show()

df3 = mod_df.withColumn("phone_number", concat_ws("", regexp_extract(col("phone_code"), prefix_pattern, 1), 
                                                  regexp_replace(col("local_phone"), r'\s+', ''))) \
            .drop('phone_code','local_phone') \
            .withColumn("name", upper(col("name"))) \
            .withColumn("surname", upper(col("surname")))
df3.show()


#slide 21

In [None]:
# @udf(returnType=StringType())
def is_even_udf(x):
    return x % 2 == 0

df = spark.range(1, 100).toDF("x")

# is_even_udf = udf(is_even_udf, StringType())

result_df = df.select(col("x"), is_even_udf(col("x")) \
.alias("is_even"))

result_df.printSchema()

#slide 22

In [None]:
# UDAF, using pandas...

from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType

# Define a custom UDAF to calculate the average age per country
@pandas_udf(DoubleType(), PandasUDFType.GROUPED_AGG)
def average_age_udaf(v):
    return v.mean()

# Use the UDAF to calculate average age per country
result = df.groupBy("country").agg(average_age_udaf(col("age")).alias("average_age"))

# Show the result
result.show()

# slide 23

In [None]:
spark.stop()