# **Library and Dataset :**

## **Installation of PySpark :**

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## **Importation of PySpark :**

In [None]:
from pyspark import SparkContext
sc = SparkContext()

## **Importation of the dataset :**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Create a SparkSession
spark = SparkSession.builder.appName("Import CSV").getOrCreate()
rep=True
while(rep==True):
  print("1-Upload a file (If you are using Colab)")
  print("2-Load files")
  print("3-Leave")
  n=int(input("Make your choice"))
  if(n==1):
    from google.colab import files
    uploaded = files.upload()
  if(n==2):
    f_name=input("Enter file location")
    df = spark.read.csv(f_name, header=True, inferSchema=True)
    f_name_fusion=input("Enter fusion file location")
    df_fusion = spark.read.csv(f_name_fusion, header=True, inferSchema=True)
    rep=False
  if(n==3):
    rep=False

1-Upload a file (If you are using Colab)
2-Load files
3-Leave
Make your choice1


Saving property_data.csv to property_data (1).csv
Saving property_data_for_fusion.csv to property_data_for_fusion (1).csv
1-Upload a file (If you are using Colab)
2-Load files
3-Leave
Make your choice2
Enter file locationproperty_data.csv
Enter fusion file locationproperty_data_for_fusion.csv


### **Replacing different kind of NaN typing values to None :**

In [None]:
df = df.replace(["NA","NaN","na","n/a","--"], None)

# **Dataset Generalisation :**

## **Library Importation :**

In [None]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import sum, avg, when, isnull, col, row_number, lit
from pyspark.sql import Window
from datetime import datetime

## **Number of NaN values :**

In [None]:
Num_NaN=df.select(*[sum(df[col].isNull().cast("int")).alias(col) for col in df.columns]).toPandas()
Num_NaN.index=["Number of NaN"]
Num_NaN=Num_NaN.T
Num_NaN["Percentage"]=Num_NaN["Number of NaN"]*100/df.count()
Num_NaN["Type"]=[df.dtypes[k][1].upper() for k in range(len(df.dtypes))]
Num_NaN['Method'] = np.where(Num_NaN['Percentage'] < 10, 'DROP', np.where(Num_NaN['Percentage'] >= 20, 'COUNT', 'AVG'))
Num_NaN

Unnamed: 0,Number of NaN,Percentage,Type,Method
PID,0,0.0,INT,DROP
ST_NUM,2,22.222222,STRING,COUNT
ST_NAME,0,0.0,STRING,DROP
OWN_OCCUPIED,1,11.111111,STRING,AVG
NUM_BEDROOMS,3,33.333333,STRING,COUNT
NUM_BATH,1,11.111111,STRING,AVG
SQ_FT,2,22.222222,STRING,COUNT


### **Quick look at our dataset's first rows :**

In [None]:
df.show()

+-----+------+----------+------------+------------+--------+-----+
|  PID|ST_NUM|   ST_NAME|OWN_OCCUPIED|NUM_BEDROOMS|NUM_BATH|SQ_FT|
+-----+------+----------+------------+------------+--------+-----+
|10001|   104|    PUTNAM|           Y|           3|       1| 1000|
|10002|   197| LEXINGTON|           N|           3|     1.5| null|
|10003|  null| LEXINGTON|           N|        null|       1|  850|
|10004|   201|  BERKELEY|          12|           1|    null|  700|
|10005|   203|  BERKELEY|           Y|           3|       2| 1600|
|10006|   207|  BERKELEY|           Y|        null|       1|  800|
|10007|  null|WASHINGTON|        null|           2|  HURLEY|  950|
|10008|   213|   TREMONT|           Y|           1|       1| null|
|10009|   215|   TREMONT|           Y|        null|       2| 1800|
+-----+------+----------+------------+------------+--------+-----+



# **Data Cleaning :**

## **Replacing NaN Values :**

### **Creating List with Columns Methods of cleaning :**

In [None]:
L_Index=list(Num_NaN["Method"].index)
L_Method=list(Num_NaN["Method"])
L_Type=list(Num_NaN["Type"])
LF_Method=[]
for k in range(len(L_Index)):
    LF_Method.append([L_Index[k],L_Method[k],L_Type[k]])
print(LF_Method)

[['PID', 'DROP', 'INT'], ['ST_NUM', 'COUNT', 'STRING'], ['ST_NAME', 'DROP', 'STRING'], ['OWN_OCCUPIED', 'AVG', 'STRING'], ['NUM_BEDROOMS', 'COUNT', 'STRING'], ['NUM_BATH', 'AVG', 'STRING'], ['SQ_FT', 'COUNT', 'STRING']]


### **Dropping NaN Values :**

In [None]:
for k in range(len(LF_Method)):
    if(LF_Method[k][1]=="DROP"):
        df=df.dropna(subset=[LF_Method[k][0]])

### **Creating lists containing distinct values of each columns of the dataset :**

In [None]:
L_Distinct=[]
L_Distinct_NA=[]
#à modifier (boucle for trop lourd)
for k in range(len(LF_Method)):
  M=[row[LF_Method[k][0]] for row in df.select(LF_Method[k][0]).distinct().collect()]
  my_list = [x for x in M if x not in [None]]
  L_Distinct.append(my_list)
  L_Distinct_NA.append(M)
L=[]
#à modifier (boucle for trop lourd)
for k in range(len(LF_Method)):
  M=[row[LF_Method[k][0]] for row in df.select(LF_Method[k][0]).collect()]
  my_list = [x for x in M if x not in [None]]
  L.append(my_list)

### **Replacing NaN Values by the AVG (if the column type is a number) and by a Random Value from the following column (if the column type isn't a number) :**

In [None]:
for k in range(len(LF_Method)):
  if(LF_Method[k][2]=="INT" or LF_Method[k][2]=="DOUBLE" or LF_Method[k][2]=="FLOAT"):
    if(LF_Method[k][1]=="AVG"):
        mean_col = df.agg(avg(LF_Method[k][0])).first()[0]
        df = df.withColumn(LF_Method[k][0], when(isnull(LF_Method[k][0]), mean_col).otherwise(col(LF_Method[k][0])))
  else:
    if(LF_Method[k][1]=="AVG"):
      n=np.random.randint(len(L_Distinct[k])+1)
      df = df.withColumn(LF_Method[k][0], when(isnull(LF_Method[k][0]), L_Distinct[k][n]).otherwise(col(LF_Method[k][0])))
    elif(LF_Method[k][1]=="COUNT"):
      occurrence = []
      for l in L:
        occurrences = {}
        for i in l:
          occurrences[i] = l.count(i)
        occurrence.append(occurrences)
      max_count = max(occurrence[k].values())
      for cle, valeur in occurrence[k].items():
          if valeur == max_count:
              max_count_index = cle
      df = df.withColumn(LF_Method[k][0], when(isnull(LF_Method[k][0]), max_count_index).otherwise(col(LF_Method[k][0])))

### **Quick look at the cleaned dataset :**

In [None]:
df.show()

+-----+------+----------+------------+------------+--------+-----+
|  PID|ST_NUM|   ST_NAME|OWN_OCCUPIED|NUM_BEDROOMS|NUM_BATH|SQ_FT|
+-----+------+----------+------------+------------+--------+-----+
|10001|   104|    PUTNAM|           Y|           3|       1| 1000|
|10002|   197| LEXINGTON|           N|           3|     1.5| 1800|
|10003|   215| LEXINGTON|           N|           3|       1|  850|
|10004|   201|  BERKELEY|          12|           1|  HURLEY|  700|
|10005|   203|  BERKELEY|           Y|           3|       2| 1600|
|10006|   207|  BERKELEY|           Y|           3|       1|  800|
|10007|   215|WASHINGTON|           Y|           2|  HURLEY|  950|
|10008|   213|   TREMONT|           Y|           1|       1| 1800|
|10009|   215|   TREMONT|           Y|           3|       2| 1800|
+-----+------+----------+------------+------------+--------+-----+



# **Data Fusion :**

### **Adding IDs :**

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
dataset_ID=input("type the column name of the primary key")

type the column name of the primary keyPID


### **Transforming our dataframe into temporary table :**

In [None]:
df.createOrReplaceTempView("my_table")
df_fusion.createOrReplaceTempView("my_table_fusion")

### **Creating a list containing the fields where the fusion can be delicate**

In [None]:
Fields=["finance","bank","banks","insurance","health","medical"]

### **Creating a BackEnd DataFrame :**

In [None]:
df_Bis=df.toPandas()
df_BackEnd=spark.createDataFrame(df_Bis)

### **Adding the row number :**

In [None]:
df_BackEnd = df_BackEnd.withColumn("row_number", row_number().over(Window.orderBy(dataset_ID)))

### **Adding the current date :**

In [None]:
df_BackEnd = df_BackEnd.withColumn("modified_date", lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S")))

### **Creating the following sql table :**

In [None]:
df_BackEnd.createOrReplaceTempView("my_table_BackEnd")

### **Fusion Algorithm :**

In [None]:
def Fusion():
  global res_fusion
  res_fusion=df
  global res_fusion_BackEnd
  res_fusion_BackEnd=df_BackEnd
  fusion_ids=[]
  for k in range(len(spark.sql("select "+dataset_ID+" from my_table_fusion").collect())):
    fusion_ids.append(spark.sql("select "+dataset_ID+" from my_table_fusion").collect()[k][0])
  rep=True
  while(rep==True):
    print("1-Make a fusion")
    print("2-Leave")
    n1=int(input("Make your decision : "))
    if(n1==1):
      # Asking for the field of study in order to know if the fusion could be delicate or no :
      rep_type=input("Enter the field of study for this dataset")
      if(rep_type.lower() in Fields):
        # Asking to consider the risk of doing fusion in   such a dataset :
        rep_Bis=input("Your dataset could contain delicate data. Are you sure that you still want to continue the fusion process? (Y/N)")
        if(rep_Bis.upper()=='Y'):
          Columns_to_be_Changed=res_fusion.columns[1:]
          number=len(Columns_to_be_Changed)
          for i in range(len(fusion_ids)):
            id=fusion_ids[i]
            Changes=[]
            Current_Val=[]
            for k in range(number):
              col_type=""
              # Asking for the name of the columns to be added in the 'Columns_to_be_Changed' list :
              for i1 in LF_Method:
                if Columns_to_be_Changed[k] in i1:
                  col_type=i1[2]
              # Adding the following values in the lists we created previously :
              if(col_type=="INT" or col_type=="DOUBLE" or col_type=="FLOAT"):
                current_value=spark.sql("select "+Columns_to_be_Changed[k]+" from my_table Where "+dataset_ID+"=='"+str(id)+"'").first()[0]
                changed_value=spark.sql("select "+Columns_to_be_Changed[k]+" from my_table_fusion Where "+dataset_ID+"=='"+str(id)+"'").first()[0]
                Current_Val.append(current_value)
                Changes.append(changed_value)
              else:
                current_value=spark.sql("select "+Columns_to_be_Changed[k]+" from my_table Where "+dataset_ID+"=='"+str(id)+"'").first()[0]
                changed_value=spark.sql("select "+Columns_to_be_Changed[k]+" from my_table_fusion Where "+dataset_ID+"=='"+str(id)+"'").first()[0]
                Current_Val.append(current_value)
                Changes.append(changed_value)
            # Row to duplicate in the BackEnd Dataset : 
            row_to_duplicate = res_fusion_BackEnd.filter(res_fusion_BackEnd[dataset_ID] == id).limit(1)
            res_fusion_BackEnd = res_fusion_BackEnd.union(row_to_duplicate)
            res_fusion_BackEnd = res_fusion_BackEnd.withColumn("row_number", row_number().over(Window.orderBy(dataset_ID)))
            res_fusion_BackEnd.createOrReplaceTempView("my_table_BackEnd")
            # Generating a list comporting the row numbers of each line in the BackEnd Dataset :
            request_BackEnd=spark.sql("select row_number from my_table_BackEnd where "+dataset_ID+"="+str(id))
            L_Index_BackEnd=[]
            for k in range(request_BackEnd.count()):
              L_Index_BackEnd.append(request_BackEnd.collect()[k][0])
            # Replacing the line values in the visible Dataset and adding the line to the BackEnd Dataset :
            for j in range(len(Columns_to_be_Changed)):
              res_fusion=res_fusion.withColumn(Columns_to_be_Changed[j], when(res_fusion[dataset_ID]==id,Changes[j]).otherwise(res_fusion[Columns_to_be_Changed[j]]))
              res_fusion_BackEnd=res_fusion_BackEnd.withColumn(Columns_to_be_Changed[j], when(res_fusion_BackEnd["row_number"]==L_Index_BackEnd[-1],Changes[j]).otherwise(res_fusion_BackEnd[Columns_to_be_Changed[j]]))
            res_fusion_BackEnd = res_fusion_BackEnd.withColumn("modified_date", when(res_fusion_BackEnd["row_number"]==L_Index_BackEnd[-1],lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))).otherwise(res_fusion_BackEnd["modified_date"]))
          rep=False
        else:
          rep=False
      else:
        Columns_to_be_Changed=res_fusion.columns[1:]
        number=len(Columns_to_be_Changed)
        for i in range(len(fusion_ids)):
          id=fusion_ids[i]
          Changes=[]
          Current_Val=[]
          for k in range(number):
            col_type=""
            # Asking for the name of the columns to be added in the 'Columns_to_be_Changed' list :
            for i1 in LF_Method:
              if Columns_to_be_Changed[k] in i1:
                col_type=i1[2]
            # Adding the following values in the lists we created previously :
            if(col_type=="INT" or col_type=="DOUBLE" or col_type=="FLOAT"):
              current_value=spark.sql("select "+Columns_to_be_Changed[k]+" from my_table Where "+dataset_ID+"=='"+str(id)+"'").first()[0]
              changed_value=spark.sql("select "+Columns_to_be_Changed[k]+" from my_table_fusion Where "+dataset_ID+"=='"+str(id)+"'").first()[0]
              Current_Val.append(current_value)
              Changes.append(changed_value)
            else:
              current_value=spark.sql("select "+Columns_to_be_Changed[k]+" from my_table Where "+dataset_ID+"=='"+str(id)+"'").first()[0]
              changed_value=spark.sql("select "+Columns_to_be_Changed[k]+" from my_table_fusion Where "+dataset_ID+"=='"+str(id)+"'").first()[0]
              Current_Val.append(current_value)
              Changes.append(changed_value)
          # Row to duplicate in the BackEnd Dataset : 
          row_to_duplicate = res_fusion_BackEnd.filter(res_fusion_BackEnd[dataset_ID] == id).limit(1)
          res_fusion_BackEnd = res_fusion_BackEnd.union(row_to_duplicate)
          res_fusion_BackEnd = res_fusion_BackEnd.withColumn("row_number", row_number().over(Window.orderBy(dataset_ID)))
          res_fusion_BackEnd.createOrReplaceTempView("my_table_BackEnd")
          # Generating a list comporting the row numbers of each line in the BackEnd Dataset :
          request_BackEnd=spark.sql("select row_number from my_table_BackEnd where "+dataset_ID+"="+str(id))
          L_Index_BackEnd=[]
          for k in range(request_BackEnd.count()):
            L_Index_BackEnd.append(request_BackEnd.collect()[k][0])
          # Replacing the line values in the visible Dataset and adding the line to the BackEnd Dataset :
          for j in range(len(Columns_to_be_Changed)):
            res_fusion=res_fusion.withColumn(Columns_to_be_Changed[j], when(res_fusion[dataset_ID]==id,Changes[j]).otherwise(res_fusion[Columns_to_be_Changed[j]]))
            res_fusion_BackEnd=res_fusion_BackEnd.withColumn(Columns_to_be_Changed[j], when(res_fusion_BackEnd["row_number"]==L_Index_BackEnd[-1],Changes[j]).otherwise(res_fusion_BackEnd[Columns_to_be_Changed[j]]))
          res_fusion_BackEnd = res_fusion_BackEnd.withColumn("modified_date", when(res_fusion_BackEnd["row_number"]==L_Index_BackEnd[-1],lit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))).otherwise(res_fusion_BackEnd["modified_date"]))
        rep=False
    if(n1==2):
      rep=False

### **Making the fusion :**

In [None]:
Fusion()
df=res_fusion
df_BackEnd=res_fusion_BackEnd
df.createOrReplaceTempView("my_table")
df_BackEnd.createOrReplaceTempView("my_table_BackEnd")

1-Make a fusion
2-Leave
Make your decision : 1
Enter the field of study for this datasetfinance
Your dataset could contain delicate data. Are you sure that you still want to continue the fusion process? (Y/N)y


### **Let's have a quick look at the Dataset :**

In [None]:
df.show()

+-----+------+----------+------------+------------+--------+-----+
|  PID|ST_NUM|   ST_NAME|OWN_OCCUPIED|NUM_BEDROOMS|NUM_BATH|SQ_FT|
+-----+------+----------+------------+------------+--------+-----+
|10001|   104|    PUTNAM|           Y|           3|       1| 1000|
|10002|   197| LEXINGTON|           N|           3|     1.5| 1800|
|10003|  TEST|      TEST|        TEST|        TEST|    TEST| TEST|
|10004|   201|  BERKELEY|          12|           1|  HURLEY|  700|
|10005|  TEST|      TEST|        TEST|        TEST|    TEST| TEST|
|10006|   207|  BERKELEY|           Y|           3|       1|  800|
|10007|   215|WASHINGTON|           Y|           2|  HURLEY|  950|
|10008|   213|   TREMONT|           Y|           1|       1| 1800|
|10009|  TEST|      TEST|        TEST|        TEST|    TEST| TEST|
+-----+------+----------+------------+------------+--------+-----+



## **Traceability :**

### **Let's look at the BackEnd Dataset :**

In [None]:
df_BackEnd.show()

+-----+------+----------+------------+------------+--------+-----+----------+-------------------+
|  PID|ST_NUM|   ST_NAME|OWN_OCCUPIED|NUM_BEDROOMS|NUM_BATH|SQ_FT|row_number|      modified_date|
+-----+------+----------+------------+------------+--------+-----+----------+-------------------+
|10001|   104|    PUTNAM|           Y|           3|       1| 1000|         1|2023-03-18 21:13:13|
|10002|   197| LEXINGTON|           N|           3|     1.5| 1800|         2|2023-03-18 21:13:13|
|10003|   215| LEXINGTON|           N|           3|       1|  850|         3|2023-03-18 21:13:13|
|10003|  TEST|      TEST|        TEST|        TEST|    TEST| TEST|         4|2023-03-18 21:13:29|
|10004|   201|  BERKELEY|          12|           1|  HURLEY|  700|         5|2023-03-18 21:13:13|
|10005|   203|  BERKELEY|           Y|           3|       2| 1600|         6|2023-03-18 21:13:13|
|10005|  TEST|      TEST|        TEST|        TEST|    TEST| TEST|         7|2023-03-18 21:13:42|
|10006|   207|  BERK

In [None]:
request=spark.sql("select * from my_table_BackEnd where PID=10009")
request.show()

+-----+------+-------+------------+------------+--------+-----+----------+-------------------+
|  PID|ST_NUM|ST_NAME|OWN_OCCUPIED|NUM_BEDROOMS|NUM_BATH|SQ_FT|row_number|      modified_date|
+-----+------+-------+------------+------------+--------+-----+----------+-------------------+
|10009|   215|TREMONT|           Y|           3|       2| 1800|        11|2023-03-18 21:13:13|
|10009|  TEST|   TEST|        TEST|        TEST|    TEST| TEST|        12|2023-03-18 21:13:34|
+-----+------+-------+------------+------------+--------+-----+----------+-------------------+



# **Data Delivery :**

In [None]:
from google.colab import files
df_pandas=df.toPandas()

# Save the Pandas DataFrame as a CSV file on your local machine
df_pandas.to_csv("merged_dataset.csv", index=False)

# Download the CSV file to your local machine
files.download("merged_dataset.csv")

# **SparkSQL :**

### **Line Break function :**

In [None]:
def Line_Break(n):
  rep=""
  for k in range(n):
    rep+='\n'
  return rep

### **Automatized SparkSQL :**

In [None]:
Queries=[]
def SQL_Query():
  rep=True
  while(rep==True):
    print("1-Make a query")
    print("2-Show result")
    print("3-Show all queries")
    print("4-Show a particular result")
    print("5-Leave")
    n=int(input("Make your choice : "+Line_Break(2)))
    if(n==1):
      query=input(Line_Break(2)+"Write your query : "+Line_Break(2))
      result = spark.sql(query)
      Queries.append([query,result])
    if(n==2):
      print(Line_Break(2))
      Queries[-1][1].show()
      print(Line_Break(2))
    if(n==3):
      print(Line_Break(2))
      for k in range(len(Queries)):
        print("Query (ID="+str(k+1)+") : "+Queries[k][0],end="\n")
        if(k==len(Queries)-1):
          print(Line_Break(2))
    if(n==4):
      n1=int(input("\n\n"+"Type the query ID : "+"\n\n"))
      Queries[n1-1][1].show()
    if(n==5):
      rep=False
SQL_Query()