In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("aula0")
    .getOrCreate()
)

In [2]:
spark

In [3]:
path_processing = {
    "user":"/home/jovyan/work/datalake/s3/processing-zone/user",
    "bank":"/home/jovyan/work/datalake/s3/processing-zone/bank"
}

In [4]:
# ### lendo dados
df_user = (
    spark
    .read
    .format("parquet")
    .load(path_processing["user"])
)

df_user = df_user.alias("u")

df_bank = (
    spark
    .read
    .format("parquet")
    .load(path_processing["bank"])
)

df_bank = df_bank.alias("b")

In [5]:
from pyspark.sql.functions import col

df_user_bank = (
    df_user
        .join(
            df_bank, 
                col("u.user_id") == col("b.user_id"),
                how="inner"
        )
        .select(
            "u.user_id",
            "u.username",
            "u.email",
            "u.phone_number",
            "b.bank_name"
        )
)

df_user_bank.show()

agg_df_user_bank = df_user_bank.groupBy("bank_name").count()

agg_df_user_bank.show()

+-------+-----------------+--------------------+--------------------+--------------------+
|user_id|         username|               email|        phone_number|           bank_name|
+-------+-----------------+--------------------+--------------------+--------------------+
|    911|     bryan.hessel|bryan.hessel@emai...| +351 1-376-202-8010|          ABBEY LIFE|
|   8189|      sid.pollich|sid.pollich@email...|+86 1-370-764-611...|ALKEN ASSET MANAG...|
|   8189|      sid.pollich|sid.pollich@email...|+86 1-370-764-611...|ABN AMRO HOARE GO...|
|   6292|   tony.mcdermott|tony.mcdermott@em...| +249 1-761-469-0510|ABN AMRO HOARE GO...|
|   6292|   tony.mcdermott|tony.mcdermott@em...| +249 1-761-469-0510|ABN AMRO HOARE GO...|
|   6325|   duane.schaefer|duane.schaefer@em...|+91 (491) 908-440...|ALKEN ASSET MANAG...|
|   6907|    brigida.upton|brigida.upton@ema...|   +964 416-666-2945|ABBOTSTONE AGRICU...|
|   8459|    eunice.barton|eunice.barton@ema...|+258 1-180-295-80...|ABN AMRO MEZZANIN...|

In [9]:
path_delivery = {
    "user_bank":"/home/jovyan/work/datalake/s3/delivery-zone/user_bank",
    "agg_df_user_bank":"/home/jovyan/work/datalake/s3/delivery-zone/agg_df_user_bank"
}

In [14]:
### escrevendo os dados
(
    df_user_bank
    .write
    .partitionBy("bank_name")
    .mode("overwrite")
    .format("parquet")
    .save(path_delivery["user_bank"])
)

(
    agg_df_user_bank
    .write
    .mode("overwrite")
    .format("parquet")
    .save(path_delivery["agg_df_user_bank"])
)