In [1]:
pip install pyspark

[0mNote: you may need to restart the kernel to use updated packages.


In [2]:
# Librerie
from pyspark.sql import SparkSession
from pyspark.sql.window import Window as W
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [3]:
# Creazione Spark Session
spark = (SparkSession.builder.appName("Azienda").getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/26 17:27:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Percorso dei csv file che vogliamo leggere
dipendenti_fpath = "/kaggle/input/azienda/Dipendenti.csv"
reparti_fpath = "/kaggle/input/azienda/Reparti.csv"
tickets_fpath = "/kaggle/input/azienda/Tickets.csv"

In [5]:
# Creazione dei dataframe

dipendenti_df = (spark.read.format("csv")
                .option("header", True)
                .option("inferSchema", True)
                .option("delimiter", ";")
                .load(dipendenti_fpath))

reparti_df = (spark.read.format("csv")
             .option("header", True)
             .option("inferSchema", True)
             .option("delimiter", ";")
             .load(reparti_fpath))

tickets_df = (spark.read.format("csv")
             .option("header", True)
             .option("inferSchema", True)
             .option("delimiter", ";")
             .load(tickets_fpath))

dipendenti_df.show(10)
reparti_df.show(10)
tickets_df.show(10)

+--------+--------+-------+---------+
|    Nome| Cognome|Reparto|Stipendio|
+--------+--------+-------+---------+
|  Andrea|  Assini|     IT|     1500|
|   Bruno| Bettini|     IT|     2000|
|   Carlo|Cicerini|     IT|     1800|
|   Dario|  Desini|     HR|     1200|
|   Elena| Ettorre|     HR|     1600|
|Federica| Fierini|     HR|     2200|
|  Giulia|Giorgini|     HR|     1400|
|   Ivana|  Itteri|    CEO|     3000|
+--------+--------+-------+---------+

+-------+--------------+-------+
|Reparto|  Responsabile|   Sede|
+-------+--------------+-------+
|     IT| Matteo Micini|   Roma|
|     HR|Nunzia Nimetti| Milano|
|    CEO| Orazio Ovinci|Bologna|
+-------+--------------+-------+

+-------------+-------+------+
|Ticket Number|Reparto|Inflow|
+-------------+-------+------+
|            1|     IT|   100|
|            2|     HR|     0|
|            3|     IT|   300|
|            4|     IT|   150|
|            5|     HR|     0|
|            6|     IT|   200|
|            7|     HR|     0|
|

In [6]:
# NB: invece di usare infereSchema potevamo definire manualmente gli schemi

schema = T.StructType([
        T.StructField("Nome", T.StringType(), True),
        T.StructField("Cognome", T.StringType(), True),
        T.StructField("Reparto", T.StringType(), True),
        T.StructField("Stipendio", T.IntegerType(), True)])

dipendenti_data_raw = (spark.read.format("csv")
                       .option("header", True)
                       .option("delimiter", ";")
                       .schema(schema) # <---
                       .load(dipendenti_fpath))

dipendenti_data_raw.show()

+--------+--------+-------+---------+
|    Nome| Cognome|Reparto|Stipendio|
+--------+--------+-------+---------+
|  Andrea|  Assini|     IT|     1500|
|   Bruno| Bettini|     IT|     2000|
|   Carlo|Cicerini|     IT|     1800|
|   Dario|  Desini|     HR|     1200|
|   Elena| Ettorre|     HR|     1600|
|Federica| Fierini|     HR|     2200|
|  Giulia|Giorgini|     HR|     1400|
|   Ivana|  Itteri|    CEO|     3000|
+--------+--------+-------+---------+



In [7]:
# Rimozione record indesiderati
tickets_df = tickets_df.filter(F.col("Reparto")!="NULL")
tickets_df.show()

+-------------+-------+------+
|Ticket Number|Reparto|Inflow|
+-------------+-------+------+
|            1|     IT|   100|
|            2|     HR|     0|
|            3|     IT|   300|
|            4|     IT|   150|
|            5|     HR|     0|
|            6|     IT|   200|
|            7|     HR|     0|
|            8|     IT|   800|
+-------------+-------+------+



In [8]:
# Nome e cognome di coloro che hanno uno stipendio superiore a 2000 euro
dipendenti_df.select("Nome", "Cognome", "Stipendio").filter(F.col("Stipendio")>2000).show()

+--------+-------+---------+
|    Nome|Cognome|Stipendio|
+--------+-------+---------+
|Federica|Fierini|     2200|
|   Ivana| Itteri|     3000|
+--------+-------+---------+



In [9]:
# Numero di dipendenti per ogni reparto
numero_dipendenti_df = dipendenti_df.groupBy(F.col("Reparto")).agg(F.count("*").alias("Numero dipendenti"))
numero_dipendenti_df = numero_dipendenti_df.orderBy(F.col("Numero dipendenti").desc())
numero_dipendenti_df.show()

+-------+-----------------+
|Reparto|Numero dipendenti|
+-------+-----------------+
|     HR|                4|
|     IT|                3|
|    CEO|                1|
+-------+-----------------+



In [10]:
# Join tabelle (DEMO)

dipendenti_df = dipendenti_df.withColumnRenamed("Reparto", "Reparto_d")
reparti_df = reparti_df.withColumnRenamed("Reparto", "Reparto_r")
tickets_df = tickets_df.withColumnRenamed("Reparto", "Reparto_t")

azienda_df = dipendenti_df.join(reparti_df, dipendenti_df["Reparto_d"] == reparti_df["Reparto_r"]).join(tickets_df, dipendenti_df["Reparto_d"] == tickets_df["Reparto_t"])

azienda_df = azienda_df.drop("Reparto_r").drop("Reparto_t").withColumnRenamed("Reparto_d", "Reparto")

azienda_df.show()

+------+--------+-------+---------+--------------+------+-------------+------+
|  Nome| Cognome|Reparto|Stipendio|  Responsabile|  Sede|Ticket Number|Inflow|
+------+--------+-------+---------+--------------+------+-------------+------+
|Andrea|  Assini|     IT|     1500| Matteo Micini|  Roma|            8|   800|
|Andrea|  Assini|     IT|     1500| Matteo Micini|  Roma|            6|   200|
|Andrea|  Assini|     IT|     1500| Matteo Micini|  Roma|            4|   150|
|Andrea|  Assini|     IT|     1500| Matteo Micini|  Roma|            3|   300|
|Andrea|  Assini|     IT|     1500| Matteo Micini|  Roma|            1|   100|
| Bruno| Bettini|     IT|     2000| Matteo Micini|  Roma|            8|   800|
| Bruno| Bettini|     IT|     2000| Matteo Micini|  Roma|            6|   200|
| Bruno| Bettini|     IT|     2000| Matteo Micini|  Roma|            4|   150|
| Bruno| Bettini|     IT|     2000| Matteo Micini|  Roma|            3|   300|
| Bruno| Bettini|     IT|     2000| Matteo Micini|  

In [11]:
# Per ogni dipendente, il responsabile
responsabili_df = dipendenti_df.join(reparti_df, dipendenti_df["Reparto_d"]==reparti_df["Reparto_r"])
responsabili_df = responsabili_df.select("Nome", "Cognome", "Responsabile")
responsabili_df.show()

+--------+--------+--------------+
|    Nome| Cognome|  Responsabile|
+--------+--------+--------------+
|  Andrea|  Assini| Matteo Micini|
|   Bruno| Bettini| Matteo Micini|
|   Carlo|Cicerini| Matteo Micini|
|   Dario|  Desini|Nunzia Nimetti|
|   Elena| Ettorre|Nunzia Nimetti|
|Federica| Fierini|Nunzia Nimetti|
|  Giulia|Giorgini|Nunzia Nimetti|
|   Ivana|  Itteri| Orazio Ovinci|
+--------+--------+--------------+



In [12]:
# Per ogni reparto, il numero di ticket 
tickets_df.withColumnRenamed("Reparto_t", "Reparto").groupBy("Reparto").agg(F.count("*").alias("Tickets_tot")).show()

+-------+-----------+
|Reparto|Tickets_tot|
+-------+-----------+
|     HR|          3|
|     IT|          5|
+-------+-----------+



In [13]:
# Etichettare i dipendenti che guadagnano più di 2000 euro come "HIGH" e gli altri come "LOW"
categorie_df = dipendenti_df.withColumn("Categoria", F.when(dipendenti_df["Stipendio"]>2000, "HIGH").otherwise("LOW"))
categorie_df.show()

+--------+--------+---------+---------+---------+
|    Nome| Cognome|Reparto_d|Stipendio|Categoria|
+--------+--------+---------+---------+---------+
|  Andrea|  Assini|       IT|     1500|      LOW|
|   Bruno| Bettini|       IT|     2000|      LOW|
|   Carlo|Cicerini|       IT|     1800|      LOW|
|   Dario|  Desini|       HR|     1200|      LOW|
|   Elena| Ettorre|       HR|     1600|      LOW|
|Federica| Fierini|       HR|     2200|     HIGH|
|  Giulia|Giorgini|       HR|     1400|      LOW|
|   Ivana|  Itteri|      CEO|     3000|     HIGH|
+--------+--------+---------+---------+---------+



In [14]:
# Stipendio medio per reparto
stipendi_medi_df = dipendenti_df.groupBy("Reparto_d").agg(F.avg("Stipendio").alias("Stipendio Medio"))
stipendi_medi_df = stipendi_medi_df.withColumn("Stipendio Medio", F.format_number("Stipendio Medio", 2))
stipendi_medi_df.show()

+---------+---------------+
|Reparto_d|Stipendio Medio|
+---------+---------------+
|       HR|       1,600.00|
|      CEO|       3,000.00|
|       IT|       1,766.67|
+---------+---------------+



In [15]:
# Totale stipendi per reparto (Over Partition)
stipendi_reparto_df = dipendenti_df.withColumn("Stipendio Medio Reparto", F.sum("Stipendio").over(W.partitionBy("Reparto_d")))
stipendi_reparto_df.show()

+--------+--------+---------+---------+-----------------------+
|    Nome| Cognome|Reparto_d|Stipendio|Stipendio Medio Reparto|
+--------+--------+---------+---------+-----------------------+
|   Ivana|  Itteri|      CEO|     3000|                   3000|
|   Dario|  Desini|       HR|     1200|                   6400|
|   Elena| Ettorre|       HR|     1600|                   6400|
|Federica| Fierini|       HR|     2200|                   6400|
|  Giulia|Giorgini|       HR|     1400|                   6400|
|  Andrea|  Assini|       IT|     1500|                   5300|
|   Bruno| Bettini|       IT|     2000|                   5300|
|   Carlo|Cicerini|       IT|     1800|                   5300|
+--------+--------+---------+---------+-----------------------+

