In [1]:
from os import PathLike
from hdfs import InsecureClient
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from delta import *
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, FloatType, DateType
import pyspark.sql.functions as f

In [2]:
#SPARK CONFIG
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *

#Warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/demo/silver'

builder = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .enableHiveSupport() \

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()



:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cee54a41-5b01-4927-bd84-0cedcf9e03da;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 452ms :: artifacts dl 17ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-run

In [3]:
#(drop da tabela database do hdfs F_SP_League)
spark.sql(
    """
    DROP DATABASE IF EXISTS F_SP_League CASCADE
    """
)

#(drop da deltalake table F_SP_League)
spark.sql(
    """
    DROP TABLE IF EXISTS F_SP_League.deltalake_table
    """
)

#CRIAR F_SP_League.db
spark.sql(
    """
    CREATE DATABASE F_SP_League LOCATION 'hdfs://hdfs-nn:9000/demo/silver/F_SP_League.db/'
    """
)

#CRIAR A DELTALAKE TABLE
spark.sql(
    """
    CREATE EXTERNAL TABLE F_SP_League.deltalake_table (
        HomeTeam STRING,
        AwayTeam STRING,
        FTR STRING,
        Date STRING,
        FTHG INT,
        FTAG INT,
        HS INT,
        AS INT,
        HST INT,
        AST INT,
        HC INT,
        AC INT
    )
    USING DELTA
    PARTITIONED BY (
         Div STRING
    )
    LOCATION 'hdfs://hdfs-nn:9000/demo/silver/F_SP_League.db/deltalake_table/'
    """
)

22/01/17 21:00:13 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `f_sp_league`.`deltalake_table` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
22/01/17 21:00:13 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


DataFrame[]

In [4]:
from pyspark.sql.functions import col

#READ THE .CSV FILE IN HDFS AND PUT IT IN A DATAFRAME
hdfs_path = "hdfs://hdfs-nn:9000/demo/bronze/F_SP_League.csv"

#DEFINE THE SCHEMA OF THE DATAFRAME
customSchema = StructType([
    StructField("Div", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("HomeTeam", StringType(), True),        
    StructField("AwayTeam", StringType(), True),
    StructField("FTHG", IntegerType(), True),
    StructField("FTAG", IntegerType(), True),
    StructField("FTR", StringType(), True),
    StructField("HTHG", IntegerType(), True),
    StructField("HTAG", IntegerType(), True),
    StructField("HTR", StringType(), True),
    StructField("HS", IntegerType(), True),
    StructField("AS", IntegerType(), True),
    StructField("HST", IntegerType(), True),
    StructField("AST", IntegerType(), True),
    StructField("HF", IntegerType(), True),
    StructField("AF", IntegerType(), True),
    StructField("HC", IntegerType(), True),
    StructField("AC", IntegerType(), True),
    StructField("HY", IntegerType(), True),
    StructField("AY", IntegerType(), True),
    StructField("HR", IntegerType(), True),
    StructField("AR", IntegerType(), True),
    StructField("B365H", FloatType(), True),
    StructField("B365D", FloatType(), True),
    StructField("B365A", FloatType(), True),
    StructField("BWH", FloatType(), True),
    StructField("BWD", FloatType(), True),
    StructField("BWA", FloatType(), True),
    StructField("IWH", FloatType(), True),
    StructField("IWD", FloatType(), True),
    StructField("IWA", FloatType(), True),
    StructField("PSH", FloatType(), True),
    StructField("PSD", FloatType(), True),
    StructField("PSA", FloatType(), True),
    StructField("WHH", FloatType(), True),
    StructField("WHD", FloatType(), True),
    StructField("WHA", FloatType(), True),
    StructField("VCH", FloatType(), True),
    StructField("VCD", FloatType(), True),
    StructField("VCA", FloatType(), True),
    StructField("Bb1X2", IntegerType(), True),
    StructField("BbMxH", FloatType(), True),
    StructField("BbAvH", FloatType(), True),
    StructField("BbMxD", FloatType(), True),
    StructField("BbAvD", FloatType(), True),
    StructField("BbMxA", FloatType(), True),
    StructField("BbAvA", FloatType(), True),
    StructField("BbOU", IntegerType(), True),
    StructField("BbMx>2.5", FloatType(), True),
    StructField("BbAv>2.5", FloatType(), True),
    StructField("BbMx<2.5", FloatType(), True),
    StructField("BbAv<2.5", FloatType(), True),
    StructField("BbAH", IntegerType(), True),
    StructField("BbAHh", FloatType(), True),
    StructField("BbMxAHH", FloatType(), True),
    StructField("BbAvAHH", FloatType(), True),
    StructField("BbMxAHA", FloatType(), True),
    StructField("BbAvAHA", FloatType(), True),
    StructField("PSCH", FloatType(), True),
    StructField("PSCD", FloatType(), True),
    StructField("PSCA", FloatType(), True)
])

st_bd = spark \
    .read \
    .option("delimiter",",") \
    .option("header","true") \
    .schema(customSchema) \
    .csv(hdfs_path) \

st_bd.show()
st_bd.printSchema()

22/01/17 21:00:15 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---+--------+--------------------+--------------------+----+----+---+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+-----+-----+-----+----+----+----+----+----+----+----+----+----+-----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+----+--------+--------+--------+--------+----+-----+-------+-------+-------+-------+----+-----+----+
|Div|    Date|            HomeTeam|            AwayTeam|FTHG|FTAG|FTR|HTHG|HTAG|HTR| HS| AS|HST|AST| HF| AF| HC| AC| HY| AY| HR| AR|B365H|B365D|B365A| BWH| BWD| BWA| IWH| IWD| IWA| PSH| PSD| PSA|  WHH| WHD| WHA| VCH| VCD| VCA|Bb1X2|BbMxH|BbAvH|BbMxD|BbAvD|BbMxA|BbAvA|BbOU|BbMx>2.5|BbAv>2.5|BbMx<2.5|BbAv<2.5|BbAH|BbAHh|BbMxAHH|BbAvAHH|BbMxAHA|BbAvAHA|PSCH| PSCD|PSCA|
+---+--------+--------------------+--------------------+----+----+---+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+-----+-----+-----+----+----+----+----+----+----+----+----+----+-----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+

22/01/17 21:00:16 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 67, schema size: 61
CSV file: hdfs://hdfs-nn:9000/demo/bronze/F_SP_League.csv


In [5]:
#WRITE THE DATAFRAME TO HIVE DELTALAKE TABLE

st_bd \
    .select("HomeTeam","AwayTeam", "FTR", "DATE", "HS", "AS", "FTHG", "FTAG", "HST", "AST", "HC", "AC", "Div") \
    .write \
    .mode("overwrite") \
    .partitionBy("Div") \
    .format("delta") \
    .save("hdfs://hdfs-nn:9000/demo/silver/F_SP_League.db/deltalake_table/")

                                                                                

In [6]:
#MOSTRAR A DELTALAKE TABLE

spark.sql(
    """
    SELECT *
    FROM F_SP_League.deltalake_table
    """
).show()

+--------------------+--------------------+---+--------+----+----+---+---+---+---+---+---+---+
|            HomeTeam|            AwayTeam|FTR|    Date|FTHG|FTAG| HS| AS|HST|AST| HC| AC|Div|
+--------------------+--------------------+---+--------+----+----+---+---+---+---+---+---+---+
|               Reims|            Paris SG|  D|08/08/14|   2|   2|  9| 16|  3|  6|  1|  5| F1|
|              Bastia|           Marseille|  D|09/08/14|   3|   3| 13|  9|  4|  4|  1|  4| F1|
|Evian Thonon Gail...|                Caen|  A|09/08/14|   0|   3| 10| 12|  2|  7|  5|  6| F1|
|            Guingamp|          St Etienne|  A|09/08/14|   0|   2|  6|  7|  3|  2|  4|  5| F1|
|               Lille|                Metz|  D|09/08/14|   0|   0| 14|  2|  3|  1|  8|  3| F1|
|         Montpellier|            Bordeaux|  A|09/08/14|   0|   1| 15|  7|  3|  3|  5|  1| F1|
|              Nantes|                Lens|  H|09/08/14|   1|   0| 14|  5|  4|  2|  3|  2| F1|
|                Nice|            Toulouse|  H|09/

In [7]:
#TRANSFORMATIONS

In [8]:
#ALTERAR A COLUNA DIV

#(F1 para Ligue_1)

spark.sql(
    """    
    UPDATE F_SP_League.deltalake_table
    SET Div = REPLACE(Div, 'F1', 'France')
    """
)

                                                                                

DataFrame[]

In [9]:
#(SP1 para La_Liga)

spark.sql(
    """    
    UPDATE F_SP_League.deltalake_table
    SET Div = REPLACE(Div, 'SP1', 'Spain')
    """
)

                                                                                

DataFrame[]

In [10]:
#ALTERAR A COLUNA FTR

#(H para Home)

spark.sql(
    """    
    UPDATE F_SP_League.deltalake_table
    SET FTR = REPLACE(FTR, 'H', 'Home')
    """
)

                                                                                

DataFrame[]

In [11]:
#(A para Away)

spark.sql(
    """    
    UPDATE F_SP_League.deltalake_table
    SET FTR = REPLACE(FTR, 'A', 'Away')
    """
)

                                                                                

DataFrame[]

In [12]:
#(D para Draw)

spark.sql(
    """    
    UPDATE F_SP_League.deltalake_table
    SET FTR = REPLACE(FTR, 'D', 'Draw')
    """
)

                                                                                

DataFrame[]

In [13]:
#DATA QUALITY TREATMENT

In [14]:
#LOAD THE DATAFRAME
st_bd = spark.read.format("delta").load("hdfs://hdfs-nn:9000/demo/silver/F_SP_League.db/deltalake_table/")

In [15]:
#ALTERAR A DATA DE STRING PARA DATE (...)

st_bd.select(to_date(st_bd.Date).alias('Date'))

DataFrame[Date: date]

In [16]:
#RENOMEAR AS COLUNAS (...)

st_bd    .withColumnRenamed("FTR", "Result")    .withColumnRenamed("DATE", "Date")    .withColumnRenamed("FTHG", "HomeT_Goals")    .withColumnRenamed("FTAG", "AwayT_Goals")    .withColumnRenamed("HST", "HomeT_Shots_Target")    .withColumnRenamed("AST", "AwayT_Shots_Target")    .withColumnRenamed("HC", "HomeT_Corners")     .withColumnRenamed("AC", "AwayT_Corners")    .withColumnRenamed("Div", "Division")

DataFrame[HomeTeam: string, AwayTeam: string, Result: string, Date: string, HomeT_Goals: int, AwayT_Goals: int, HS: int, AS: int, HomeT_Shots_Target: int, AwayT_Shots_Target: int, HomeT_Corners: int, AwayT_Corners: int, Division: string]

In [17]:
#VERIFY IF EXISTS ANY DUPLICATED VALUES
import pyspark.sql.functions as f
st_bd.join(
    st_bd.groupBy(st_bd.columns).agg((f.count("*")>1).cast("int").alias("Duplicate_indicator")),
    on=st_bd.columns,
    how="inner"
).show()

+----------+--------------------+----+--------+----+----+---+---+---+---+---+---+------+-------------------+
|  HomeTeam|            AwayTeam| FTR|    Date|FTHG|FTAG| HS| AS|HST|AST| HC| AC|   Div|Duplicate_indicator|
+----------+--------------------+----+--------+----+----+---+---+---+---+---+---+------+-------------------+
|St Etienne|                Lyon|Home|30/11/14|   3|   0| 11| 12|  5|  3|  4|  6|France|                  0|
|     Lille|Evian Thonon Gail...|Home|07/01/15|   1|   0| 11|  6|  3|  2|  3|  6|France|                  0|
|      Lyon|                Metz|Home|25/01/15|   2|   0| 21|  5|  6|  2|  6|  2|France|                  0|
|    Rennes|           Marseille|Draw|07/02/15|   1|   1| 11| 10|  2|  2|  4|  4|France|                  0|
|  Guingamp|                Lyon|Away|04/04/15|   1|   3|  4| 10|  1|  6|  0|  3|France|                  0|
|     Lille|               Reims|Home|04/04/15|   3|   1| 14| 10|  8|  3|  0|  2|France|                  0|
| Marseille|       

In [18]:
#COUNT THE NUMBER OF LINES THAT ARE ON THE DELTALAKE TABLE
spark.sql(
    """
    Select COUNT(*) as Number_of_lines FROM F_SP_League.deltalake_table
    """
).show()

+---------------+
|Number_of_lines|
+---------------+
|           3800|
+---------------+



In [19]:
#IDENTIFICAR VALORES DISTINTOS 
#(por exemplo, da coluna HomeTeam)

st_bd.select('HomeTeam').distinct().collect()

[Row(HomeTeam='Nimes'),
 Row(HomeTeam='Nice'),
 Row(HomeTeam='Montpellier'),
 Row(HomeTeam='Dijon'),
 Row(HomeTeam='Betis'),
 Row(HomeTeam='Angers'),
 Row(HomeTeam='Lille'),
 Row(HomeTeam='Ath Madrid'),
 Row(HomeTeam='Nantes'),
 Row(HomeTeam='Ath Bilbao'),
 Row(HomeTeam='Marseille'),
 Row(HomeTeam='Espanol'),
 Row(HomeTeam='Malaga'),
 Row(HomeTeam='Villarreal'),
 Row(HomeTeam='Real Madrid'),
 Row(HomeTeam='Caen'),
 Row(HomeTeam='Almeria'),
 Row(HomeTeam='La Coruna'),
 Row(HomeTeam='Lens'),
 Row(HomeTeam='Elche'),
 Row(HomeTeam='Lyon'),
 Row(HomeTeam='Vallecano'),
 Row(HomeTeam='Leganes'),
 Row(HomeTeam='Granada'),
 Row(HomeTeam='Barcelona'),
 Row(HomeTeam='Monaco'),
 Row(HomeTeam='Bordeaux'),
 Row(HomeTeam='Celta'),
 Row(HomeTeam='Troyes'),
 Row(HomeTeam='Ajaccio GFCO'),
 Row(HomeTeam='Eibar'),
 Row(HomeTeam='Guingamp'),
 Row(HomeTeam='Toulouse'),
 Row(HomeTeam='Strasbourg'),
 Row(HomeTeam='Sociedad'),
 Row(HomeTeam='Reims'),
 Row(HomeTeam='St Etienne'),
 Row(HomeTeam='Rennes'),
 Row(H

In [20]:
#IDENTIFICAR VALORES NULOS
#(por exemplo, da coluna HST)

st_bd.filter(st_bd.HST.isNull()).show()

+--------+--------+----+--------+----+----+----+----+----+----+----+----+------+
|HomeTeam|AwayTeam| FTR|    Date|FTHG|FTAG|  HS|  AS| HST| AST|  HC|  AC|   Div|
+--------+--------+----+--------+----+----+----+----+----+----+----+----+------+
|  Bastia|    Lyon|Away|16/04/17|   0|   3|null|null|null|null|null|null|France|
+--------+--------+----+--------+----+----+----+----+----+----+----+----+------+



In [21]:
#IDENTIFICAR VALORES DISTINTOS 
st_bd.select('Div').distinct().collect()



[Row(Div='France'), Row(Div='Spain')]

In [22]:
st_bd \
    .write \
    .mode("overwrite") \
    .format("delta") \
    .save("hdfs://hdfs-nn:9000/demo/silver/F_SP_League.db/deltalake_table/")

                                                                                