# Data Analysis & Analysis Storage

## Spark Configuration

### Download Spark

In [None]:
!apt install openjdk-8-jre-headless -qq > /dev/null
# download spark3.0.0
!wget -q http://apache.osuosl.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
# install findspark 
!pip install -q findspark

### Library Import

In [None]:
import os
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col, avg

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.clustering import BisectingKMeans

## Configuration and Starting of SparkSession

In [None]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

findspark.init()
spark = SparkSession.builder.appName('Homework').getOrCreate()

## Spark DataFrame creation using MongoDB

### MongoDB configuration

In [None]:
! python -m pip install pymongo==3.7.2

### Library Import

In [None]:
import datetime
import pymongo
from pymongo import MongoClient

### MongoDB connection

In [None]:
user = 'prova'
psw = 'prova'
clusterA = 'cluster0-shard-00-00.pdlry.mongodb.net:27017'
clusterB = 'cluster0-shard-00-01.pdlry.mongodb.net:27017'
clusterC = 'cluster0-shard-00-02.pdlry.mongodb.net:27017'
attributes = '?ssl=true&replicaSet=atlas-ih4qpa-shard-0&authSource=admin&retryWrites=true&w=majority'

uri = 'mongodb://'+user+':'+psw+'@'+clusterA+','+clusterB+','+clusterC+'/'+attributes

client = MongoClient( uri )

collection = client.Esame.Esame
coll_df = list(collection.find({},{"Territorio":"$Territorio",
                                   "Anno":"$Anno",
                                   "ResidenzaP":"$ResidenzaP",
                                   "AteneoNOME":"$AteneoNOME",
                                   "Isc":"$Isc",
                                   "Internet":"$Internet",
                                   "Occupazione":"$Occupazione",
                                   "Biblioteche":"$Biblioteche",
                                   "Punteggio":"$Punteggio",
                                   "Tipo Ateneo":"$Tipo Ateneo",
                                   "Sesso":"$Sesso",
                                   "Laureati":"$Laureati",
                                   "Corso Laurea":"$Corso Laurea"}))

### From list to Dataframe Pandas 

In [None]:
import pandas as pd

df_pandas = pd.DataFrame(coll_df)
df_pandas = df_pandas.drop(["_id"],axis=1)

### From Dataframe Pandas to Spark DataFrame

In [None]:
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

Dataset = pandas_to_spark(df_pandas)

## Data Analysis

### 1. Checking whether the quality of the area indicates a higher number of enrolled students

In [None]:
Iscxterr = Dataset.groupBy("Territorio").sum("Isc").withColumnRenamed("sum(Isc)", "Iscritti")
Iscxterrord = Iscxterr.orderBy("Iscritti",ascending=False)

puntxterr = Dataset.groupBy("Territorio").sum("Punteggio").withColumnRenamed("sum(Punteggio)", "Punteggio")
puntxterrord = puntxterr.orderBy("Punteggio",ascending=False)

Iscxterrord.show()
puntxterrord.show()

### 2. Region where it is most difficult to graduate

In [None]:
Lauxreg = Dataset.groupBy("Territorio").sum("Laureati").withColumnRenamed("sum(Laureati)", "Laureati")
Lauxregord = Lauxreg.orderBy("Laureati",ascending=False)

Diff_laurea = Iscxterrord.alias("a").join(Lauxregord.alias("b"), Iscxterrord['Territorio'] == 
                                     Lauxregord['Territorio']).select("a.Territorio", "a.Iscritti", "b.Laureati")
Diff_laurea = Diff_laurea.withColumn("Rapporto", (F.col("Laureati") / F.col("Iscritti")))
Diff_laurea = Diff_laurea.orderBy("Rapporto",ascending=False)
Diff_laurea.show()

### 3. University with the highest number of enrolled students out of the regional total

In [None]:
Iscxreg = Dataset.groupBy("Territorio").sum("Isc").withColumnRenamed("sum(Isc)", "Iscritti per Territorio")
resultset = Dataset.groupBy("AteneoNOME").sum("Isc");
Iscxateneo = Dataset.join(resultset, "AteneoNOME");
Iscxateneo = Iscxateneo.drop("Anno","Isc","ResidenzaP","_id","Internet", "Occupazione", 
                             "Punteggio","Tipo Ateneo", "Sesso", "Laureati","Corso Laurea").\
                             withColumnRenamed("sum(Isc)","Iscritti per ateneo")
Num_isc_ateneo = Iscxateneo.join(Iscxterr,"Territorio")
Num_isc_ateneo = Num_isc_ateneo.withColumn("Rapporto", (F.col("Iscritti per Ateneo") / F.col("Iscritti per Territorio")))
Num_isc_ateneo = Num_isc_ateneo.groupBy("AteneoNOME").max("Rapporto")
Num_isc_ateneo = Num_isc_ateneo.orderBy("max(Rapporto)",ascending=False)
Num_isc_ateneo.show()

### 4. Male and female graduates by degree course

In [None]:
Lauxcorso = Dataset.groupBy("Corso Laurea").sum("Laureati").withColumnRenamed("sum(Laureati)", "Laureati per Corso di Laurea")
Lauxcorso = Lauxcorso.orderBy("Laureati per Corso di Laurea",ascending = False)

Lauxcorsomasc = Dataset.where("Sesso = 'maschi'").groupBy("Corso Laurea").sum("Laureati").withColumnRenamed("sum(Laureati)", "Laureati per Corso di Laurea maschi")
Lauxcorsomasc = Lauxcorsomasc.orderBy("Laureati per Corso di Laurea maschi",ascending = False)

Lauxcorsofemmine = Dataset.where("Sesso = 'femmine'").groupBy("Corso Laurea").sum("Laureati").withColumnRenamed("sum(Laureati)", "Laureati per Corso di Laurea femmine")
Lauxcorsofemmine = Lauxcorsofemmine.orderBy("Laureati per Corso di Laurea femmine",ascending = False)

Lau_M_F = Lauxcorsofemmine.join(Lauxcorsomasc,"Corso Laurea")
Lau_M_F = Lau_M_F.join(Lauxcorso,"Corso Laurea")
Lau_M_F = Lau_M_F.withColumn("Percentuale maschi", (F.col("Laureati per Corso di Laurea maschi") / F.col("Laureati per Corso di Laurea")))
Lau_M_F = Lau_M_F.withColumn("Percentuale femmine", (F.col("Laureati per Corso di Laurea femmine") / F.col("Laureati per Corso di Laurea")))
Lau_M = Lau_M_F.orderBy("Percentuale maschi",ascending=False)
Lau_F = Lau_M_F.orderBy("Percentuale femmine",ascending=False)

Lau_M.show()
Lau_F.show()

### 5. Degree course with more graduates

In [None]:
Lau_per_corso = Dataset.groupBy("Corso Laurea").sum("Laureati").withColumnRenamed("sum(Laureati)", "Laureati per Corso Laurea")
Lau_per_corso = Lau_per_corso.orderBy("Laureati per corso Laurea",ascending=False)
Lau_per_corso.show()

### 6. Degree course by region with the highest number of graduates

In [None]:
Lauxterr = Dataset.groupBy("Territorio").sum("Laureati").withColumnRenamed("sum(Laureati)", "Laureati per Territorio")
Corsixterr = Dataset.groupBy("Territorio","Corso Laurea").sum("Laureati").withColumnRenamed("sum(Laureati)", "Laureati per Corso e regione")
Corsixterrmax = Corsixterr.groupby("Territorio").max("Laureati per Corso e regione").withColumnRenamed("max(Laureati per Corso e regione)", "Laureati per Corso e regione")
Corsixterrmax = Corsixterrmax.withColumnRenamed("Territorio", "Territorio2")
Lau_corso_reg = Corsixterrmax.join(Corsixterr,"Laureati per Corso e regione")
Lau_corso_reg = Lau_corso_reg.where("Territorio = Territorio2")
Lau_corso_reg = Lau_corso_reg.drop("Territorio2")
Lau_corso_reg = Lau_corso_reg.orderBy("Laureati per Corso e regione",ascending= False)
Lau_corso_reg.show()

## Machine Learning

### Remove undesired columns

In [None]:
training = Dataset.groupBy("Territorio").sum("Punteggio").withColumnRenamed("sum(Punteggio)", "Punteggio")
lista = training.columns
lista.pop(0)    # Remove 'Territorio' col
lista.pop(-1)   # Remove 'Punteggio' col
lista.pop(1)    # Remove 'Biblioteche' col

### K-means Implementation

In [None]:
vecAssembler = VectorAssembler(inputCols=lista, outputCol="features")
vector_df = vecAssembler.transform(Dataset)

kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(vector_df)

predictions = model.transform(vector_df)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

### K-means Results

In [None]:
predizioneKM = model.transform(vector_df).select("Territorio","prediction")
predizioneKM.show()

### Gaussian Mixture Model Implementation

In [None]:
vecAssembler2 = VectorAssembler(inputCols=lista, outputCol="features")
vector_df2 = vecAssembler2.transform(Dataset2)

gmm = GaussianMixture().setK(3).setSeed(538009335)
modelGMM = gmm.fit(vector_df2)

print("Gaussians shown as a DataFrame: ")
modelGMM.gaussiansDF.show(truncate=False)

### Gaussian Mixture Model Results

In [None]:
predizioneGMM = modelGMM.transform(vector_df2).select("Territorio","prediction")
predizioneGMM.show()

### Bisecting K-means Implementation

In [None]:
vecAssembler3 = VectorAssembler(inputCols=lista, outputCol="features")
vector_df3 = vecAssembler3.transform(Dataset2)

bkm = BisectingKMeans().setK(3).setSeed(1)
modelBKM = bkm.fit(vector_df3)

predictions = modelBMK.transform(vector_df3)

evaluator = ClusteringEvaluator()

silhouetteBKM = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouetteBKM))

print("Cluster Centers: ")
centers = modelBMK.clusterCenters()
for center in centers:
    print(center)

### Bisecting K-means Results

In [None]:
predizioneBKM = modelBKM.transform(vector_df3).select("Territorio","prediction")
predizioneBKM.show()

## Analysis Storage

### Creation of New Database for Results

In [None]:
nuovodatabase = client["Risultati_esame"]

In [None]:
\begin{lstlisting}[language=python]



\subsection{Difficoltà di laurea per regione}
\begin{lstlisting}[language=python]

\end{lstlisting}

\subsection{Atenei con più iscritti nella regione}
\begin{lstlisting}[language=python]

\end{lstlisting}

\subsection{Studenti maschi e femmine per corso di studi}
\begin{lstlisting}[language=python]

\end{lstlisting}

\subsection{Corso di laurea con più laureati}
\begin{lstlisting}[language=python]

\end{lstlisting}

\subsection{Distribuzione dei laureati per corso di laurea nella regione}
\begin{lstlisting}[language=python]

\end{lstlisting}

### Analysis 1 Storage: Score VS Enrolling

In [None]:
join_collection = nuovodatabase["Iscritti_per_territorio"]
join_mongo = Iscxterrord.toPandas()
join_mongo.reset_index(level=0, inplace=True)
join_collection.insert_many(Iscxterrord.toPandas().to_dict('records'))

join_collection = nuovodatabase["Punteggio_per_territorio"]
join_mongo = puntxterrord.toPandas()
join_mongo.reset_index(level=0, inplace=True)
join_collection.insert_many(puntxterrord.toPandas().to_dict('records'))

### Analysis 2 Storage: Degree difficulties per regions

In [None]:
join_collection = nuovodatabase["Regioni_per_difficoltà_di_laurea"]
join_mongo = Diff_laurea.toPandas()
join_mongo.reset_index(level=0, inplace=True)
join_collection.insert_many(Diff_laurea.toPandas().to_dict('records'))

### Analysis 3 Storage: Enrolled students per University

In [None]:
join_collection = nuovodatabase["Rilevanza_ateneo_per_regione"]
join_mongo = Num_isc_ateneo.toPandas()
join_mongo.reset_index(level=0, inplace=True)
join_collection.insert_many(Num_isc_ateneo.toPandas().to_dict('records'))

### Analysis 4 Storage: Male and Female by Degree

In [None]:
join_collection = nuovodatabase["Maschi_e_femmine_per_corso_di_laurea"]
join_mongo = Lau_M_F.toPandas()
join_mongo.reset_index(level=0, inplace=True)
join_collection.insert_many(Lau_M_F.toPandas().to_dict('records'))

### Analysis 5 Storage: Courses with more graduates

In [None]:
join_collection = nuovodatabase["Corsi_con_più_laureati"]
join_mongo = Lau_per_corso.toPandas()
join_mongo.reset_index(level=0, inplace=True)
join_collection.insert_many(Lau_per_corso.toPandas().to_dict('records'))

### Analysis 6 Storage: Courses per Regions and Graduates

In [None]:
join_collection = nuovodatabase["Corsi_più_seguiti_per_regione"]
join_mongo = Lau_corso_reg.toPandas()
join_mongo.reset_index(level=0, inplace=True)
join_collection.insert_many(Lau_corso_reg.toPandas().to_dict('records'))