#Lutwaffe
###**Descrizione**
Il dataset riguarda luoghi e perdite dell’aviazione tedesca durante la seconda guerra mondiale.


#Setup environment

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://downloads.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

In [None]:
!ls

lutwaffe-posizioni.csv	 sample_data		  spark-3.3.1-bin-hadoop3.tgz
lutwaffe-variazioni.csv  spark-3.3.1-bin-hadoop3  spark-3.3.1-bin-hadoop3.tgz.1


In [None]:
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
sc = spark.sparkContext

#Import dataset
I due csv sono stati prima caricati in File

In [None]:
df_var = spark.read.csv("/content/lutwaffe-variazioni.csv",header=True,inferSchema=True)
df_var.show()

+--------------------+----------------+------------+--------+----------------+---------+----+-------+---------------+---------------+----+----------+-------------+----------------+----------------+---------+-------------------+
|          group_type| squadron_number|group_number|subgroup|           total|    model| add|add_new|add_maintenance|add_other_units|lost|lost_enemy|lost_accident|lost_maintenance|lost_other_units|total_eom|               date|
+--------------------+----------------+------------+--------+----------------+---------+----+-------+---------------+---------------+----+----------+-------------+----------------+----------------+---------+-------------------+
|   Aufklärungsgruppe|               1|          10|    null|               9|Fw 189A-1|   0|      0|              0|              0|   0|         0|            0|               0|               0|        9|1942-03-15 00:00:00|
|   Aufklärungsgruppe|               1|          10|    null|               9|Fw 189A-1|

In [None]:
df_pos = spark.read.csv("/content/lutwaffe-posizioni.csv",header=True,inferSchema=True)
df_pos.show()

+--------------------+---------------+------------+--------+----------------+----------------+--------------------+-------------------+
|          group_type|squadron_number|group_number|subgroup|             lat|             lon|           locations|               date|
+--------------------+---------------+------------+--------+----------------+----------------+--------------------+-------------------+
|      Jagdgeschwader|             NA|          27|    Stab|           30.25|          19.195|          El Agheila|1942-01-15 00:00:00|
|Sturzkampfgeschwader|             NA|           2|      II|        30.26132|        19.18885|El Aghelia, Agedabia|1941-04-15 00:00:00|
|      Jagdgeschwader|             NA|          27|      II|30.4583333333333|          18.575|    Arco Philaenorum|1942-01-15 00:00:00|
|      Jagdgeschwader|             NA|          27|      II|30.4583333333333|          18.575|    Arco Philaenorum|1942-11-15 00:00:00|
|Sturzkampfgeschwader|             NA|          

##Quesito 1: Qual è la media delle flotte trimestre per trimestre del numero totale degli apparecchi?

In [None]:
from pyspark.sql import functions as F

df_var.na.drop(how="any", subset=["total","date"])\
.withColumn("trimestre", F.quarter(df_var["date"]))\
.groupBy("trimestre").agg(F.avg('total').alias('media trimestrale')).show()

+---------+------------------+
|trimestre| media trimestrale|
+---------+------------------+
|        1| 7.225451807228915|
|        3| 7.255362384285392|
|        4| 7.904384384384384|
|        2|7.3834137376521936|
+---------+------------------+



##Quesito 2: Qual è la media delle flotte della percentuale di apparecchi persi in combattimento?

In [None]:
df_var.na.drop(how="any",subset=["total","lost_enemy"])\
.withColumn("percent_lost_enemy", df_var["lost_enemy"] * 100 / df_var["total"])\
.agg(F.mean('percent_lost_enemy').alias('media delle flotte della percentuale di apparecchi persi in combattimento')).show()

+-------------------------------------------------------------------------+
|media delle flotte della percentuale di apparecchi persi in combattimento|
+-------------------------------------------------------------------------+
|                                                        8.654453206568132|
+-------------------------------------------------------------------------+



##Quesito 3: Qual è la varianza delle variazioni a livello mensile degli apparecchi, sul totale delle flotte? (non è permesso usare le funzioni pre-fabbricate per calcolare la varianza)

In [None]:
df = df_var.na.drop(how="any",subset=["total","total_eom"])\
.withColumn("monthly_var", F.col("total_eom") - F.col("total"))

mean = df.agg(F.mean("monthly_var")).first()[0]

df.withColumn("squared_dev", F.pow(F.col("monthly_var") - mean, 2))\
.agg((F.sum("squared_dev") / (F.count("monthly_var") - 1))\
     .alias("varianza delle variazioni a livello mensile degli apparecchi")).show()

+------------------------------------------------------------+
|varianza delle variazioni a livello mensile degli apparecchi|
+------------------------------------------------------------+
|                                           40.07434902377563|
+------------------------------------------------------------+



##Quesito 4: Per ogni luogo censito (colonna locations) qual’è la distribuzione percentuale della tipologia di apparecchi (colonna group_type) che si sono trovati in quel luogo lungo tutto il corso del tempo?

In [None]:
from pyspark.sql.window import Window

window = Window.partitionBy("locations")

df_pos.join(df_var, ["group_type", "squadron_number", "group_number", "subgroup", "date"])\
.groupBy(["locations", "group_type"]).agg(F.sum("total").alias("group_count"))\
.withColumn("total_count_loc", F.sum("group_count").over(window))\
.withColumn("distribuzione percentuale", F.round((F.col("group_count") / F.col("total_count_loc")) * 100, 2)).show()

+----------------+--------------------+-----------+---------------+-------------------------+
|       locations|          group_type|group_count|total_count_loc|distribuzione percentuale|
+----------------+--------------------+-----------+---------------+-------------------------+
|         Aalborg|      Jagdgeschwader|       35.0|           35.0|                    100.0|
|    Aalborg-West|     Kampfgeschwader|        3.0|            3.0|                    100.0|
|Abbeville-Drucat|      Jagdgeschwader|      351.0|          351.0|                    100.0|
|          Achmer|     Kampfgeschwader|      116.0|          116.0|                    100.0|
|     Achtirskaja|Sturzkampfgeschwader|       34.0|           34.0|                    100.0|
|     Achtirskaya|Sturzkampfgeschwader|       88.0|           88.0|                    100.0|
|         Ahlhorn|     Kampfgeschwader|       16.0|           16.0|                    100.0|
|   Ain-el-Gazala|Sturzkampfgeschwader|       44.0|         

##Quesito 5: Qual è stato il mese più “letale” (quindi con più apparecchi persi in combattimento)?

In [None]:
from pyspark.sql.functions import month

df_var.na.drop(how="any",subset=["lost_enemy","date"])\
.select("date", "lost_enemy")\
.groupBy(month(df_var.date).alias("month")).agg(F.sum("lost_enemy").alias("total_lost_enemy"))\
.agg(F.max_by("month","total_lost_enemy").alias("Mese più 'letale'")).show()

+-----------------+
|Mese più 'letale'|
+-----------------+
|                7|
+-----------------+

