# Hadling large dataset by PySpark

## Requirements

In [24]:
!pip install pyspark
!pip install findspark
!pip install unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=b7c913bc9c46bd872d2ec6fd428861f5d476ecb870df608819b57e8bedf4b3f5
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

## Checking informatino session

In [25]:
import pandas as pd
import pyspark

import findspark
findspark.init()

In [26]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fd65d63ac10>


In [27]:
from pyspark import SparkContext
from pyspark.sql import SQLContext 

s_context = SparkContext.getOrCreate()
#s_context = SparkContext("local", "existencia_colmenas")
sql = SQLContext(s_context)



## Reading Excel files

In [13]:
import os

class FileReader:
  def __init__(self, path:str):
    self.path = path
    self.list_f = []
    self.collect_files()
  
  def collect_files(self):
    # Get the list of all files and directories
    self.list_f = os.listdir(self.path)
    self.list_f = [ self.path +'/'+ x for x in self.list_f ]

  def show_files(self):
    for x in range(len(self.list_f)):
      print(x+1, "  ", self.list_f[x])

In [19]:
folder = FileReader("./data_income")
folder.show_files()

1    ./data_income/numero-superficie-de-upa-forestal-regional-comunal.xlsx
2    ./data_income/numero-personas-administradoras.xlsx
3    ./data_income/numero-superficie-de-upa-bosque-nativo-regional-comunal.xlsx
4    ./data_income/superficie-categoría-cultivo-región-comuna.xlsx
5    ./data_income/numero-superficie-de-upa-frutales-regional-comunal.xlsx
6    ./data_income/tamaño-upa-región-comuna.xlsx
7    ./data_income/existencias-animales.xlsx
8    ./data_income/actividad-principal-región.xlsx
9    ./data_income/01_numero_superficie_de_upa_censadas_regional-xlsx.xlsx
10    ./data_income/numero-superficie-de-upa-aire-libre-bajo-cubierta-regional-comunal.xlsx
11    ./data_income/numero-upa-orientación-colmenas.xlsx
12    ./data_income/tipo-gestión-región-comuna.xlsx
13    ./data_income/numero-upa-practicas-mejoramiento-suelo.xlsx
14    ./data_income/superficie-principal-sistema-riego.xlsx
15    ./data_income/numero-productores-pueblos-originarios.xlsx
16    ./data_income/numero-de-upa-y-s

## 3. Reading data

In [87]:
from pyspark.sql import functions as F
from functools import reduce
import numpy as np
import unidecode



class spark_dataframe:
  def __init__(self, path_origin:str, path_export:str):
    """Standardising all data types from pandas to String for spark"""
    df = pd.read_excel(path_origin, sheet_name=0)
    df = df.astype(str)
    self.df_spark = sql.createDataFrame(df)
    self.df_spark.show()
    self.path_export = path_export
    self.path_origin = path_origin

  def remove_nan(self):
    """Replace nan with Null"""
    self.df_spark = self.df_spark.replace('nan', None)
    self.df_spark.show()
  
  def remove_comments(self):
    """Drop rows with null values on selected columns
       ---Removing comments---"""
    columns = self.df_spark.columns
    self.df_spark = self.df_spark.na.drop(
                  subset=columns[2:],how="all"
                )
    self.df_spark.show(12)

  def remove_empty_column(self):
    """ Drop columns with null values"""
    null_counts = self.df_spark.select(
                    [
                        F.count(
                            F.when(
                                F.col(a).isNull(), a
                                )
                            ).alias(a) 
                        for a in self.df_spark.columns
                    ]
                  ).collect()[0].asDict()

    to_delete = [ p for p, u in null_counts.items() if u == self.df_spark.count() ]

    self.df_spark = self.df_spark.drop(*to_delete)
    self.df_spark.show(truncate=True)
  
  def get_header_dataframe(self):
    """Getting those columns with null field (in other words, headers)"""
    cols = [F.col(c) for c in self.df_spark.columns]
    filter_expr = reduce(
        lambda a, b: a | b.isNull(), 
        cols[1:], 
        cols[0].isNull()
        )
    head_df = self.df_spark.filter(filter_expr)

    if head_df.isEmpty():
      expr = [F.first(col).alias(col) for col in self.df_spark.columns]
      head_df = self.df_spark.agg(*expr)

      """ Removing first row"""
      self.df_spark = spark.createDataFrame(self.df_spark.tail(self.df_spark.count()-1), self.df_spark.schema)

    
    head_df.show(truncate=False)

    return head_df
  ################################################################################################################
  def filling_data(self, multi_array): 
    for row in multi_array:
      last_value=None
      for i in range(len(row)):
        if ( row[i]==None and i>0 ):
          row[i] = last_value
        else:
          last_value = row[i]
    return multi_array

  def data_normalization(self, xl_array):  
    """It will swap: uppercase to lowercase, 
    blanckspace to underscore, and other"""
    xl_array = np.char.lower(xl_array)
    xl_array = np.char.strip(xl_array)
    xl_array = np.char.replace(xl_array, "-", "")
    xl_array = np.char.replace(xl_array, "  ", " ")
    xl_array = np.char.replace(xl_array, " ", "_")
    xl_array = np.char.replace(xl_array, "none", "")
    xl_array = np.char.replace(xl_array, ",", "")
    xl_array = np.char.replace(xl_array, "\n", "_")
    xl_array = np.char.replace(xl_array, "(", "")
    xl_array = np.char.replace(xl_array, ")", "")
    ##self.xl_array = np.char.replace(self.xl_array, ".", "")
    ##self.xl_array = np.char.replace(self.xl_array, ",", "")
    return xl_array

  # Removing non ascii and special characteres in columns
  def ascii_ignore(self, x):
    return unidecode.unidecode(x)

  # Mergin columns to unique ones
  def mergin_rows(self, array):
    a = []
    for row in array:
      a.append(np.char.array(row))
    b =np.char.array(0)
    for x in a:
      b = b + "__" +x
    b = b.replace('0___', '')
    b = b.replace('0_', '')
    b = [ self.ascii_ignore(x) for x in b ]
    return b
  ################################################################################################################
  
  def get_header_np(self):
    header_df = self.get_header_dataframe()
    header_np = np.array(header_df.select( header_df.columns ).collect())
    header_np = self.mergin_rows(self.data_normalization(self.filling_data(header_np).astype(str)))
    self.header_np = header_np

  def remove_any_null_rows(self):
    """removing whatever rows that contains null field"""
    self.df_spark = self.df_spark.na.drop(how="any")
    self.df_spark.show(truncate=False)

  def Establishing_header(self):
    self.get_header_np()
    self.df_spark = self.df_spark.toDF(*self.header_np)
    self.df_spark.show(truncate=False,n=10)

  def export_to_csv(self):
    self.df_spark.write.options(header='True', delimiter=',').format("csv").mode('overwrite').save(self.path_export + self.path_origin.split('/')[-1].split('.')[0] )


In [90]:
first = spark_dataframe(path_origin=folder.list_f[0], path_export="./data_outcome/")
first.remove_nan()
first.remove_comments()
first.remove_empty_column()
first.remove_any_null_rows()
first.Establishing_header()
first.export_to_csv()


+----------+--------------------+--------------+--------------------+--------------------+--------------------+
|Unnamed: 0|          Unnamed: 1|    Unnamed: 2|          Unnamed: 3|          Unnamed: 4|          Unnamed: 5|
+----------+--------------------+--------------+--------------------+--------------------+--------------------+
|       nan|9. Número y super...|           nan|                 nan|                 nan|                 nan|
|       nan|                 nan|           nan|                 nan|                 nan|                 nan|
|       nan|Total Nacional - ...|   Comuna 4 ,5|           Especie 6|Total\nNúmero de ...|Total\nSuperficie...|
|       nan|      Total Nacional|Total Nacional|               Total|               30364|   2068400.835026186|
|       nan|      Total Nacional|Total Nacional|      Sin clasificar|                 405|     2900.9570000005|
|       nan|      Total Nacional|Total Nacional|              Acacia|                 163|              