# Analisi dati Opencoesione

Esecuzione di analisi della banca dati Opencoesione circa i progetti relativi al ciclo di programmazione 2014/2020 aventi come tema sintetico quello della *Ricerca ed innovazione".

## Gruppo di lavoro:

* Lanciotti Antonio
* Di Gaetano Anna
* Olivieri Davide
* D'Agostino Lorenzo

### Target: Regione Marche
Andranno individuati i progetti che ricadono nelle aree tematiche della S3 regionale e di Impresa 4.0 (*leggere [README](https://github.com/LorenzoDag/Economia#readme)*)

### Import librerie ed inizializzazione sessione

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Economia").getOrCreate()

### Upload dataset
**Nota**: Cambiare i path

In [14]:
path_localizzazioni = "hdfs://localhost:9000/localizzazioni.csv"
path_progetti = "hdfs://localhost:9000/progetti.csv"
path_soggetti = "hdfs://localhost:9000/soggetti.csv"

localizzazioni = spark.read.option("delimiter", ";").option("header", True).csv(path_localizzazioni)
localizzazioni.createOrReplaceTempView("localizzazioni") # Rende la tabella visibile a spark (per query)
progetti = spark.read.option("delimiter", ";").option("header", True).csv(path_progetti)
progetti.createOrReplaceTempView("progetti")             # Rende la tabella visibile a spark (per query)
soggetti = spark.read.option("delimiter", ";").option("header", True).csv(path_soggetti)
soggetti.createOrReplaceTempView("soggetti")             # Rende la tabella visibile a spark (per query)

### Definizione e consecutiva esecuzione query
Seleziona le colonne descritte nel [README](https://github.com/LorenzoDag/Economia#readme) ed esegue una join on *COD_LOCALE_PROGETTO*

In [15]:
select_loc = "localizzazioni.COD_LOCALE_PROGETTO, localizzazioni.DEN_REGIONE, localizzazioni.DEN_PROVINCIA, localizzazioni.DEN_COMUNE, localizzazioni.INDIRIZZO_PROG"
select_prog = "progetti.OC_TITOLO_PROGETTO, progetti.OC_SINTESI_PROGETTO, progetti.DESCRIZIONE_GRANDE_PROGETTO, progetti.CUP_DESCR_CATEGORIA, progetti.CUP_DESCR_NATURA,progetti.CUP_DESCR_TIPOLOGIA,progetti.CUP_DESCR_SETTORE,progetti.CUP_DESCR_SOTTOSETTORE,progetti.OC_DATA_INIZIO_PROGETTO,progetti.FINANZ_TOTALE_PUBBLICO"
select_sogg = "soggetti.COD_ATECO_SOGG, soggetti.SOGG_DESCR_RUOLO, soggetti.OC_CODICE_FISCALE_SOGG, soggetti.OC_DENOMINAZIONE_SOGG, soggetti.INDIRIZZO_SOGG, soggetti.CAP_SOGG"

where_prog = "progetti.OC_COD_CICLO = 2 AND progetti.OC_COD_TEMA_SINTETICO = 1"
where_loc = "LOWER(localizzazioni.DEN_REGIONE) LIKE '%marche%'"

QUERY = "SELECT " + select_loc + ", " + select_prog + ", " + select_sogg + " FROM localizzazioni INNER JOIN progetti ON localizzazioni.COD_LOCALE_PROGETTO = progetti.COD_LOCALE_PROGETTO INNER JOIN soggetti ON localizzazioni.COD_LOCALE_PROGETTO = soggetti.COD_LOCALE_PROGETTO WHERE " + where_loc + " AND " + where_prog

In [16]:
result_dataframe = spark.sql(QUERY)
result_dataframe.createOrReplaceTempView("result")
spark.catalog.cacheTable("result")

### Definizione funzione filtro
input: 
> * dataframe_name : (String) nome della tabella cachata
> * testo : (String / List of string) testo da filtrare all'interno della tabella

output:
> * None : se testo vuoto
> * dataframe filtrato : (pyspark.sql.dataframe.DataFrame) dataframe contenente le righe che contengono testo

In [46]:
from functools import reduce
from pyspark.sql import DataFrame

def filterDataframeForTech(dataframe_name, testo=''):
    if not testo: return None
    if not isinstance(testo, list):
        testo = testo.lower()
        return spark.sql(f"SELECT * FROM {dataframe_name} WHERE LOWER(DESCRIZIONE_GRANDE_PROGETTO) LIKE '%{testo}%' OR LOWER(OC_TITOLO_PROGETTO) LIKE '%{testo}%' OR LOWER(OC_SINTESI_PROGETTO) LIKE '%{testo}%' OR LOWER(CUP_DESCR_NATURA) LIKE '%{testo}%' OR LOWER(CUP_DESCR_TIPOLOGIA) LIKE '%{testo}%' OR LOWER(CUP_DESCR_SETTORE) LIKE '%{testo}%' OR LOWER(CUP_DESCR_SOTTOSETTORE) LIKE '%{testo}%' OR LOWER(CUP_DESCR_CATEGORIA) LIKE '%{testo}%'")
    df_list = []
    for word in testo:
        df_list.append(spark.sql(f"SELECT * FROM {dataframe_name} WHERE LOWER(DESCRIZIONE_GRANDE_PROGETTO) LIKE '%{word}%' OR LOWER(OC_TITOLO_PROGETTO) LIKE '%{word}%' OR LOWER(OC_SINTESI_PROGETTO) LIKE '%{word}%' OR LOWER(CUP_DESCR_NATURA) LIKE '%{word}%' OR LOWER(CUP_DESCR_TIPOLOGIA) LIKE '%{word}%' OR LOWER(CUP_DESCR_SETTORE) LIKE '%{word}%' OR LOWER(CUP_DESCR_SOTTOSETTORE) LIKE '%{word}%' OR LOWER(CUP_DESCR_CATEGORIA) LIKE '%{word}%'"))
    return reduce(DataFrame.unionAll, df_list)

# Filtraggio del dataframe
L'obiettivo e' quello di filtrare le righe del dataframe contenenti parole chiave riconducibili a progetti inerenti le dimensioni tematiche descritte nel file [README](https://github.com/LorenzoDag/Economia#readme) ed i progetti di **industria 4.0**

### Definizione parole chiave

In [47]:
s3_list = ["industria intelligente", "ambiente", "salute", "alimentazione", "azienda digitale", "smart communities", "mobilità intelligente", "turismo", "patrimonio culturale", "aerospazio", "difesa"]
ind4_list = ["robot collaborativi", "3d", "stampa 3d", "realtà aumentata", "simulazione", "simulazioni", "integrazione verticale", "integrazione orizzontale", "internet of things", "iot", "cloud computing", "cloud", "cybersecurity", "sicurezza informatica", "big data", "analytics"]

### Filtraggio del dataframe

In [50]:
s3_df = filterDataframeForTech("result", s3_list)
ind4_df = filterDataframeForTech("result", ind4_list)

## Opzionale!
Salvataggio datasets.

**Nota**: Cambiare i path

In [54]:
main_path = "/home/hadoop_user/Scrivania/github_economia/Economia/"
if main_path: # Solo se non vuoto
    result_dataframe.repartition(1).write.option("header", True).option("delimiter", ";").csv(main_path + "dataset_tot") # Dataset totale
    s3_df.repartition(1).write.option("header", True).option("delimiter", ";").csv(main_path + "dataset_tematiche_s3")# Dataset dimensioni tematiche s3
    ind4_df.repartition(1).write.option("header", True).option("delimiter", ";").csv(main_path + "dataset_industria_4.0")    

AnalysisException: path file:/home/hadoop_user/Scrivania/economia/dataset_tot already exists.