In [1]:
import pyspark
from delta import *
from delta.tables import *
from pyspark.sql.functions import *

builder = pyspark.sql.SparkSession.builder.appName("delta").master("spark://spark-master:7077") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key","datalake") \
    .config("spark.hadoop.fs.s3a.secret.key","datalake") \
    .config("spark.hadoop.fs.s3a.endpoint","http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

spark = configure_spark_with_delta_pip(builder).enableHiveSupport().getOrCreate()

In [2]:
spark

In [18]:
df_bronze = spark.read.json("s3a://camada-bronze/user")

In [19]:
df_bronze.count()

61

In [20]:
df_bronze.repartition(1).write.parquet('s3a://camada-prata/user',mode='overwrite')

In [21]:
df_prata = spark.read.parquet('s3a://camada-prata/user')

In [22]:
df_prata.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- coordinates: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lng: double (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- street_address: string (nullable = true)
 |    |-- street_name: string (nullable = true)
 |    |-- zip_code: string (nullable = true)
 |-- avatar: string (nullable = true)
 |-- credit_card: struct (nullable = true)
 |    |-- cc_number: string (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- email: string (nullable = true)
 |-- employment: struct (nullable = true)
 |    |-- key_skill: string (nullable = true)
 |    |-- title: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)
 |-- password: string (nullable = true)
 |-- phone_numb

In [23]:
df_prata.count()

61

In [25]:
df_prata.select(
'id',
'uid',
'username',
'first_name',
'last_name',
'phone_number',
'email',
'date_of_birth',
'gender',
'address.city',
'address.country',
'address.state',
'address.street_address',
'address.street_name',
'address.zip_code').write.parquet('s3a://camada-ouro/user_simple')


In [None]:
df.show()

In [None]:
type(df)

In [None]:
## DATA LAKEHOUSE

In [26]:
df_bronze.write.format("delta").mode('overwrite').save('s3a://camada-prata/user_lakehouse')

## MERGE

In [27]:
deltaTable = DeltaTable.forPath(spark,'s3a://camada-prata/user_lakehouse')

In [28]:
type(deltaTable)

delta.tables.DeltaTable

In [29]:
df_novo = spark.read.json("s3a://camada-bronze/user")

In [30]:
df_novo.show()

+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------+-----------+----+-------------+----------+--------------------+-----------------------+--------------------+--------------------+-------------------+
|             address|              avatar|         credit_card|date_of_birth|               email|          employment|first_name|     gender|  id|    last_name|  password|        phone_number|social_insurance_number|        subscription|                 uid|           username|
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------+-----------+----+-------------+----------+--------------------+-----------------------+--------------------+--------------------+-------------------+
|{Ocieland, {-59.2...|https://robohash....|{4485-8020-1255-2...|   1966-03-11|sheri.runolfsdott...|{Organisation, Co...|     Sheri| Polygender|4595|Runolfsdo

In [None]:
df_novo = df_novo.drop_duplicates(['city_id'])

In [None]:
df_novo.show()

In [None]:
deltaTable.alias('old') \
  .merge(
    df_novo.alias('new'),
    'old.id = new.id'
  ) \
  .whenMatchedUpdate(set =
    {
        "first_name": "new.first_name"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
        "city_id": "new.city_id",
        "city": "new.city",
        "country_id": "new.country_id",
        "last_update": "new.last_update",
    }
  ) \
  .execute()

In [31]:
deltaTable2 = DeltaTable.forPath(spark,'s3a://camada-prata/user_lakehouse')

In [32]:
deltaTable2.toDF().show()

+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------+-----------+----+-------------+----------+--------------------+-----------------------+--------------------+--------------------+--------------------+
|             address|              avatar|         credit_card|date_of_birth|               email|          employment|first_name|     gender|  id|    last_name|  password|        phone_number|social_insurance_number|        subscription|                 uid|            username|
+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+----------+-----------+----+-------------+----------+--------------------+-----------------------+--------------------+--------------------+--------------------+
|{Ocieland, {-59.2...|https://robohash....|{4485-8020-1255-2...|   1966-03-11|sheri.runolfsdott...|{Organisation, Co...|     Sheri| Polygender|4595|Runolf