Per eseguire la demo all'interno di un contenitore Docker, procedere come segue:

1. Assicurarsi che Docker sia installato localmente.
2. Eseguire `docker pull jupyter/pyspark-notebook` per recuperare l'ultima immagine da DockerHub.
3. Eseguire `docker run --name pyspark-notebook -p 8888:8888 -p 4040:4040 -d pyspark-notebook:latest` per avviare un container.
4. Per fermare il contenitore, eseguire `docker stop pyspark-notebook`.

Quando il container sara' attivo, usare VSCode ( o la vostra IDE di preferenza ) per attaccarsi al container.


In [None]:
# Required packages for Jupyter notebook
# !conda install -y -q pyspark 

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DoubleType, LongType

spark = SparkSession.builder.appName('demo_02_01').getOrCreate()

Read CSV files into dataframe. Use explicit schema:

In [None]:
schema = StructType([
                StructField("artist", StringType(), True),
                StructField("auth", StringType(), True),
                StructField("firstName", StringType(), True),
                StructField("gender", StringType(), True),
                StructField("itemInSession", IntegerType(), True),
                StructField("lastName", StringType(), True),
                StructField("length", DoubleType(), True),
                StructField("level", StringType(), True),
                StructField("location", StringType(), True),
                StructField("method", StringType(), True),
                StructField("page", StringType(), True),
                StructField("registration", DoubleType(), True),
                StructField("sessionId", IntegerType(), True),
                StructField("song", StringType(), True),
                StructField("status", IntegerType(), True),
                StructField("ts", LongType(), True),
                StructField("userAgent", StringType(), True),
                StructField("userId", StringType(), True)
        ])
df = spark.read.option("header", True).csv("data", schema=schema)
df.printSchema()

Read CSV files into dataframe with InferSchema=True (default):

In [None]:
df = spark.read.option("delimiter", ",").option("header", True).csv("data")
df.printSchema()

In [None]:
df.show(n=5,truncate=False)

Filter data on a column:

In [None]:
print(f"Original column shape: {df.count()}")

df = df.filter(F.col("page") == "NextSong")

print(f"Original column shape: {df.count()}")

Manipulate dataframe to create a *user table* from data:

In [None]:
users_table = df.dropDuplicates(["userId"])\
                .select(F.col("userId").alias("user_id"),
                        F.col("firstName").alias("first_name"),
                        F.col("lastName").alias("last_name"),
                        F.col("gender"),
                        F.col("level"))\
                .where(F.col("user_id").isNotNull())
users_table.show(n=5,truncate=False)


Write dataframe as parquet:

In [None]:
users_table.write.mode('overwrite').parquet("output_data/users_table")