# Set up Spark Environment

In [None]:

!apt-get install openjdk-17-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz

In [None]:
!tar xf spark-3.3.2-bin-hadoop3.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master('local[*]')
        .appName('Intro to Spark')
        .config('spark.ui.port', '4050')
        .getOrCreate())
spark

In [None]:
sc = spark.sparkContext


# RDD

In [None]:
data = [("Finance", 10), ("Marketing", 20), ("IT", 30), ("Sales", 40), ("Administration",50)]
rdd = sc.parallelize(data)


In [None]:
print(type(rdd))

In [None]:
rdd_filter = rdd.filter(lambda row: row[0] == "Finance" )

In [None]:
rdd_filter_coll = rdd_filter.collect()
rdd_filter_coll

In [None]:
rdd_coll = rdd.collect()

In [None]:
for row in rdd_coll:
  print(row[0] + "," + str(row[1]))

# DataFrame

In [None]:
columns = ["Department", "Employees"]
df = rdd.toDF(columns)

In [None]:
print(type(df))

In [None]:
df.printSchema()

In [None]:
df.show()

In [None]:
from pyspark.sql.functions import col
df.filter(col("Department") == "Finance").show()

In [None]:
df.filter("Department == 'Finance'").show()

# DataFrame Transformations

Upload the file `people.csv` in the root directory

In [None]:
df_csv = spark.read.csv("people.csv", header=True, inferSchema=True)
df_csv

In [None]:
df_csv.printSchema()

In [None]:
df_csv.show(5)

In [None]:
df_csv.select('*').show(5)

In [None]:
from pyspark.sql.functions import lit

In [None]:

df_csv.select('first_name', 'last_name', lit(10).alias('Numero_maglia')).show(5)

In [None]:
df_csv.selectExpr('id', 'first_name', 'last_name', "concat(first_name,' ', last_name ) AS full_name").show(5)

In [None]:
df_csv.filter(df_csv.id < 5).show()

In [None]:
from pyspark.sql.functions import col
df_csv.filter(col('last_name') == 'Shave').show()

In [None]:
df_csv.filter("ip_address = '32.94.154.64'").show()

In [None]:
employee = [(1, "Finance"), (2, "IT"), (3, "Administration"), (4, "IT"), (5, "Finance")]
schema_employee = ["id", "department"]
df_employee = spark.createDataFrame(employee, schema_employee)
df_employee.show()

In [None]:
df_csv_1 = df_csv.filter('id < 8').select('id', 'first_name', 'city')

df_join = df_csv_1.join(df_employee, 'id', 'left')

df_join.show()

In [None]:
df_csv.join(df_employee,"id", "inner").explain(extended=False)

In [None]:
from pyspark.sql.functions import broadcast
df_broadcast = df_csv.join(broadcast(df_employee),"id", "inner")
df_broadcast.show()

In [None]:
df_broadcast.explain(extended=False)

In [None]:
city = [(1, "Catania"), (2, "Palermo"), (3, "Messina"), (4, "Siracusa"), (5, "Trapani")]
schema_city = ["id", "city"]
df_city = spark.createDataFrame(city, schema_city)
df_city.show()

In [None]:
df_city.union(df_city).orderBy('id').show()

In [None]:
df_city.union(df_city).distinct().orderBy('id').show()

In [None]:
df_csv.dropDuplicates(['gender']).show()

In [None]:
df_csv.limit(2).show()

In [None]:
df_csv.groupBy('country').agg({'id' : 'count'}).show()

In [None]:
df_csv.filter(col('country') == 'France' ).count()

In [None]:
df_csv.groupBy('country').max('id').show(4)

In [None]:
from pyspark.sql.functions import when
df_csv.select('id', 'country') \
    .withColumn('isFrance', when(col('country') == 'France', 1).otherwise(0)) \
    .show()


In [None]:
df_csv.select('id', 'country') \
    .withColumnRenamed('country', 'nazione') \
    .show(10)

In [None]:
df_csv.drop('ethereum_address', 'country') \
  .show(5)

# Pyspark Functions 

In [None]:
from pyspark.sql.functions import concat
df_csv.select(col('first_name'), col('last_name'), concat(col('first_name'), col('last_name')).alias('full_name')) \
  .show(5)

In [None]:
from pyspark.sql.functions import concat_ws
df_csv.select(col('first_name'), col('last_name'), concat_ws(" ", col('first_name'), col('last_name')).alias('full_name')) \
  .show(5)

In [None]:
from pyspark.sql.functions import substring
df_csv.select(col('ethereum_address'), substring('ethereum_address',0, 5)) \
  .show(5, truncate=False)

In [None]:
df_test = spark.range(1,10) \
    .withColumn("null_even", when(col("id") % 2 == 0, None).otherwise('ok'))

df_test.show()

In [None]:
from pyspark.sql.functions import coalesce
df_test.withColumn('test_coalesce', coalesce(col('null_even'), col('id'))) \
    .show()

In [None]:
from pyspark.sql.functions import current_date
df_date = (spark.range(1,10)
      .withColumn("current_date", current_date()))
      
df_date.show()

In [None]:
from pyspark.sql.functions import year, month, dayofmonth

(df_date.withColumn("year", year(col("current_date")))
    .withColumn("month", month(col("current_date")))
    .withColumn("day", dayofmonth(col("current_date")))
    .show()
  )

In [None]:
df_csv.filter(col('gender').isin('Male', 'Female')).show()

#### Repartitions

In [None]:
df_repartion = spark.range(0,18, numPartitions=6)
df_repartion.show()

In [None]:
print(df_repartion.rdd.getNumPartitions())

In [None]:
(df_repartion
    .write
    .option("header", True)
    .mode("overwrite")
    .csv("output_csv"))

In [None]:
(df_repartion
    .repartition(4)
    .write
    .option("header", True)
    .mode("overwrite")
    .csv("output_csv"))

In [None]:
(df_repartion
    .coalesce(4)
    .write
    .option("header", True)
    .mode("overwrite")
    .csv("output_csv"))

In [None]:
from pyspark.sql.functions import count
df_csv.groupBy('country') \
    .agg(count("id").alias("count")) \
    .orderBy("count", ascending=False) \
    .show()

In [None]:
df_filter = df_csv.filter(col('country') == 'France') 
  

In [None]:
df_filter.count()

In [None]:
df_filter.cache().count()

In [None]:
df_filter.count()

In [None]:
df_filter.unpersist().count()

#### UDF

In [None]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Converting function to UDF 
convertUDF = udf(lambda z: convertCase(z),StringType())

In [None]:
phrases = [(1, "all you need is love"), (2, "helter skelter"), (3, "another day in life")]
schema_phrases = ["id", "phrase"]
df_phrases = spark.createDataFrame(phrases, schema_phrases)

In [None]:
(df_phrases.withColumn("phrase_upper_first_letter", convertUDF(col("phrase"))) 
   .show(truncate=False))

# Spark SQL

In [None]:
df_csv.createOrReplaceTempView("people_table")

In [None]:
df_sql = spark.sql(" SELECT COUNTRY, COUNT(ID) AS COUNT FROM people_table GROUP BY COUNTRY ORDER BY COUNT DESC")

In [None]:
print(type(df_csv))

In [None]:
print(type(df_sql))

In [None]:
df_sql.show()

In [None]:
(spark.sql(""" 
        SELECT COUNTRY, COUNT(ID) AS COUNT 
        FROM people_table 
        GROUP BY COUNTRY 
        HAVING COUNT >= 50 
        ORDER BY COUNT DESC
    """)
       .show())

In [None]:
from pyspark.sql.functions import when
(spark.sql(""" 
          SELECT ID,
            COUNTRY, 
            CASE 
              WHEN country = 'France' THEN 1
              ELSE 0
            END AS isFrance
          FROM people_table 
        """)
      .show())
    

In [None]:
(spark.sql(""" 
          SELECT *
          FROM people_table 
          WHERE gender IN ('Male', 'Female')
        """)
      .show())


In [None]:
(df_csv.groupBy('country') 
        .agg(count("id").alias("count")) 
        .where(col("count") >= 50 )
        .orderBy("count", ascending=False) 
        .show())

In [None]:
spark.catalog.dropTempView("people_table")

# DataFrame Actions

In [None]:
df_csv.take(2)

In [None]:
df_csv.limit(2).collect()

In [None]:
df_csv.show(1, truncate=False)

In [None]:
df_csv.printSchema()

In [None]:
df_csv.count()

In [None]:
(df_csv.limit(10)
    .write
    .option("header", True)
    .mode("overwrite")
    .csv("output_csv"))

In [None]:
(spark
  .read
  .option("header", True)
  .csv("output_csv")
  .show())

In [None]:
(df_csv.limit(10)
    .write
    .mode("overwrite")
    .parquet("output_parquet"))

In [None]:
(spark
  .read
  .parquet("output_parquet")
  .show())