In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from delta import *
from os import PathLike
from hdfs import InsecureClient
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, FloatType
from pyspark.sql.functions import when, col, concat, lit

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/AreasVerdes/warehouse'

builder = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Python Spark DataFrames and SQL") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .enableHiveSupport() \

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
hdfs_path = "hdfs://hdfs-nn:9000/AreasVerdes/Bronze/GreenStreets.csv"

In [3]:
#definir o schema
customSchema = StructType([
    StructField("Hectares", FloatType(), True),
    StructField("Borough", StringType(), True),
    StructField("Commissiondate", StringType(), True),
    StructField("CommunityBoard", IntegerType(), True),
    StructField("Council_District", IntegerType(), True),
    StructField("Department", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("FeatureStatus", StringType(), True),
    StructField("Gispropnum", StringType(), True),
    StructField("GSGroup", StringType(), True),
    StructField("GSType", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("MOU", StringType(), True),
    StructField("NYS_Assembly", StringType(), True),
    StructField("NYS_Senate", IntegerType(), True),
    StructField("Omppropid", StringType(), True),
    StructField("Parent_ID", StringType(), True),
    StructField("Precinct", IntegerType(), True),
    StructField("Sitename", StringType(), True),
    StructField("STArea", FloatType(), True),
    StructField("STLenght", FloatType(), True),
    StructField("SubCategory", StringType(), True),
    StructField("System", StringType(), True),        
    StructField("US_Congress", IntegerType(), True), 
    StructField("ZipCode", IntegerType(), True),
    StructField("Multypolygon", StringType(), True)
])

input = spark \
            .read\
            .option("delimiter",";")\
            .option("header","true")\
            .schema(customSchema) \
            .csv(hdfs_path)

In [4]:
# Escolher colunas pretendidas
gs = input.select("System", "Gispropnum","Omppropid","Department","Parent_ID","Location","Hectares","GSType","SubCategory","CommunityBoard","Precinct","ZipCode","FeatureStatus","STArea","STLenght","Multypolygon","Borough")

In [5]:
#Remover duplicados de gispropnum
gs = gs.dropDuplicates(["Gispropnum"])

In [6]:
#GSType remover null/""/None
gs = gs.withColumn("GSType", when(gs.GSType == "","Sem info") \
      .when(gs.GSType == None,"Sem info") \
      .otherwise(gs.GSType))
gs = gs.na.fill("Sem info",["GSType"])

In [7]:
#Community_Board  remover null/""/None
gs = gs.withColumn("CommunityBoard", when(gs.GSType == "",0) \
      .when(gs.CommunityBoard == None,0) \
      .otherwise(gs.CommunityBoard))
gs = gs.na.fill(0,["CommunityBoard"])

In [8]:
#ZipCode remover null/""/None
gs = gs.withColumn("ZipCode", when(gs.ZipCode == "",0) \
      .when(gs.CommunityBoard == None,0) \
      .otherwise(gs.CommunityBoard))
gs = gs.na.fill(0,["ZipCode"])

In [9]:
#Borough excluir valores invalidos
listValues = ['B', 'Q', 'R', 'X', 'M']
gs = gs.filter(col('Borough').isin(listValues))

In [10]:
# Acres para hectares
gs = gs.withColumn("Hectares",col("Hectares") /2.471 )


In [11]:
#Escrever delta lake
gs \
    .select("System","Gispropnum","Omppropid","Department","Parent_ID","Location","Hectares","GSType","SubCategory","CommunityBoard","Precinct","ZipCode","FeatureStatus","STArea","STLenght","Multypolygon","Borough") \
    .write \
    .mode("overwrite") \
    .partitionBy("Borough") \
    .format("delta") \
    .save("hdfs://hdfs-nn:9000/AreasVerdes/Silver/GreenStreets")

In [None]:
spark.stop()