In [1]:
import os
import sys
import socket

import sys,uuid,datetime
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

key = os.environ["MINIO_ACCESS_KEY"]
secret = os.environ["MINIO_SECRET_KEY"]
endpoint = os.environ["MINIO_SECRET_ENDPOINT"]
endpoint = "http://192.168.2.128:9000"
print(endpoint)

#sc.stop()

spark = SparkSession.builder \
.master("k8s://https://kubernetes.docker.internal:6443") \
.appName("playing_with_immo24") \
.config("spark.hadoop.fs.s3a.access.key", key) \
.config("spark.hadoop.fs.s3a.secret.key", secret) \
.config("spark.hadoop.fs.s3a.endpoint", endpoint) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0,org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config('spark.submit.deployMode', 'client') \
.config("spark.kubernetes.container.image", "spark:spark-docker") \
.config("spark.kubernetes.pyspark.pythonVersion", "3") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "default") \
.config("spark.executor.instances", "1") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.kubernetes.executor.request.cores","0.5") \
.config("spark.kubernetes.executor.limit.cores","1") \
.config("jupyterService.jupyterPort_create_prop", "30888") \
.config("serviceAccount", "spark") \
.getOrCreate()

#.config("spark.driver.host", "10.1.2.104") \
#.config("spark.driver.port", "4040") \

sc = spark.sparkContext
#sc._conf.getAll()

http://192.168.2.128:9000


# read dataframe

In [None]:
path_BE = "s3a://real-estate/staging/201031_Bern_buy_0_flat.gz" 
df_props = spark.read.json(path_BE)

# helper functions

In [None]:
from pyspark.sql.types import ArrayType, StructType
from pyspark.sql.functions import col, explode_outer

#Flatten array of structs and structs
def flatten(df):

   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if (type(field.dataType) == ArrayType or type(field.dataType) == StructType) and field.name.startswith('propertyDetails')])
   
   #print(complex_fields) 
   while len(complex_fields)!=0:    
        
      col_name=list(complex_fields.keys())[0]
      #print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
       
      if col_name in ["propertyDetails_images","propertyDetails_pdfs","propertyDetails_commuteTimes_defaultPois_transportations"]:
            #remove and skip next part
            df=df.drop(col_name)
      else:
          # if StructType then convert all sub element to columns.
          # i.e. flatten structs
          if (type(complex_fields[col_name]) == StructType):
             expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
             df=df.select("*", *expanded).drop(col_name)


          # if ArrayType then add the Array Elements as Rows using the explode function
          # i.e. explode Arrays
          elif (type(complex_fields[col_name]) == ArrayType):
             df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
      #print(complex_fields)
      #print(df.count())

   return df

df_props_flatten=flatten(df_props)
df_props_flatten.printSchema()

# create delta table

In [3]:
delta_path = "s3a://real-estate/lake/bronze/property"
delta_table_name='property'
database = 'immo'

In [None]:
df_props_flatten.write \
    .format("delta") \
    .mode('overwrite')  \
    .option("mergeSchema", True) \
    .save(delta_path)

    
spark.sql(
        "CREATE DATABASE IF NOT EXISTS {}".format(database)
    )

spark.sql(
        """
        CREATE TABLE IF NOT EXISTS {}.{}
        USING DELTA
        LOCATION "{}"
        """.format(
            database, delta_table_name, delta_path
        )
    )    

# query delta table

In [11]:
spark.sql("DESCRIBE FORMATTED delta.`{}`".format(delta_path)).show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|propertyDetails_a...|   bigint|       |
|propertyDetails_a...|   string|       |
|propertyDetails_a...|   string|       |
|propertyDetails_c...|   bigint|       |
|propertyDetails_c...|   string|       |
|propertyDetails_c...|   bigint|       |
|propertyDetails_c...|   bigint|       |
|propertyDetails_d...|   string|       |
|propertyDetails_g...|   bigint|       |
|propertyDetails_h...|  boolean|       |
|propertyDetails_h...|  boolean|       |
|  propertyDetails_id|   bigint|       |
|propertyDetails_i...|   string|       |
|propertyDetails_i...|  boolean|       |
|propertyDetails_i...|  boolean|       |
|propertyDetails_i...|   string|       |
|propertyDetails_i...|  boolean|       |
|propertyDetails_i...|  boolean|       |
|propertyDetails_i...|  boolean|       |
|propertyDetails_l...|   string|       |
+--------------------+---------+-------+
only showing top

In [12]:
spark.sql("SELECT COUNT(*) FROM delta.`{}`".format(delta_path)).show()

+--------+
|count(1)|
+--------+
|      20|
+--------+

