In [None]:
!pip install python-dotenv

import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, date_format, monotonically_increasing_id, year, month, day, dayofmonth, dayofweek, quarter, weekofyear, dayofyear
from pyspark.sql.types import IntegerType,BooleanType,DateType
from pyspark.sql.functions import regexp_replace
from google.cloud.storage import Client, transfer_manager

load_dotenv()



True

# Download dos arquivos a partir do bucket raw

In [None]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "credentials/credentials.json"

gcs_client = Client()
bucket = gcs_client.bucket("fraud-detection-staging-aso")

if not os.path.exists("temp"):
  os.makedirs("temp")

blob_names = [blob.name for blob in bucket.list_blobs()]

results = transfer_manager.download_many_to_path(
    bucket, blob_names, destination_directory="temp", max_workers=2
)

## Instanciação do spark (aqui deve ser feita a autenticação com o GCP)

In [None]:
from google.colab import auth
auth.authenticate_user()

spark = SparkSession \
  .builder \
  .config("spark.jars", "gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.2.jar") \
  .config("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
  .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", os.getenv("GOOGLE_APPLICATION_CREDENTIALS")) \
  .config("temporaryGcsBucket", os.getenv("TEMP_SPARK_BUCKET")) \
  .getOrCreate()

# Tratamento do `cards_data.csv`

Análise exploratória do dataset

In [None]:
df = spark.read.csv('temp/cards_data.csv', header=True, inferSchema=True)
df.show(3)

+----+---------+----------+---------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|  id|client_id|card_brand|card_type|     card_number|expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|
+----+---------+----------+---------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|4524|      825|      Visa|    Debit|4344676511950444|12/2022|623|     YES|               2|      $24295|       09/2002|                 2008|              No|
|2731|      825|      Visa|    Debit|4956965974959986|12/2020|393|     YES|               2|      $21968|       04/2014|                 2014|              No|
|3701|      825|      Visa|    Debit|4582313478255491|02/2024|719|     YES|               2|      $46414|       07/2003|                 2004|              No|
+----+---------+----------+---------+---

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- expires: string (nullable = true)
 |-- cvv: integer (nullable = true)
 |-- has_chip: string (nullable = true)
 |-- num_cards_issued: integer (nullable = true)
 |-- credit_limit: string (nullable = true)
 |-- acct_open_date: string (nullable = true)
 |-- year_pin_last_changed: integer (nullable = true)
 |-- card_on_dark_web: string (nullable = true)



In [None]:
df.count()

6146

In [None]:
for col_name in df.columns:
  print(col_name, df.filter(df[col_name].isNull()).count())

id 0
client_id 0
card_brand 0
card_type 0
card_number 0
expires 0
cvv 0
has_chip 0
num_cards_issued 0
credit_limit 0
acct_open_date 0
year_pin_last_changed 0
card_on_dark_web 0


In [None]:
df.describe().show()

+-------+------------------+-----------------+----------+---------------+--------------------+-------+-----------------+--------+------------------+------------+--------------+---------------------+----------------+
|summary|                id|        client_id|card_brand|      card_type|         card_number|expires|              cvv|has_chip|  num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|
+-------+------------------+-----------------+----------+---------------+--------------------+-------+-----------------+--------+------------------+------------+--------------+---------------------+----------------+
|  count|              6146|             6146|      6146|           6146|                6146|   6146|             6146|    6146|              6146|        6146|          6146|                 6146|            6146|
|   mean|            3072.5|994.9396355353075|      NULL|           NULL|4.820425803848972E15|   NULL|506.2207940123658|    NULL|1.50309

Drop da coluna sensível CVV e Alteração dos types e formatação de dados com caracteres especiais, para não dar problema na etapa de BI

In [None]:
df = df.drop('cvv')
df = df.withColumn("expires", to_date(df["expires"], "MM/yyyy"))
df = df.withColumn("acct_open_date", to_date(df["acct_open_date"], "MM/yyyy"))
df = df.withColumn("has_chip", df["has_chip"].cast(BooleanType()))
df = df.withColumn("card_on_dark_web", df["card_on_dark_web"].cast(BooleanType()))
df = df.withColumn("credit_limit", regexp_replace("credit_limit", "\\$", ""))


In [None]:
df.show(3)

+----+---------+----------+---------+----------------+----------+--------+----------------+------------+--------------+---------------------+----------------+
|  id|client_id|card_brand|card_type|     card_number|   expires|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|
+----+---------+----------+---------+----------------+----------+--------+----------------+------------+--------------+---------------------+----------------+
|4524|      825|      Visa|    Debit|4344676511950444|2022-12-01|    true|               2|       24295|    2002-09-01|                 2008|           false|
|2731|      825|      Visa|    Debit|4956965974959986|2020-12-01|    true|               2|       21968|    2014-04-01|                 2014|           false|
|3701|      825|      Visa|    Debit|4582313478255491|2024-02-01|    true|               2|       46414|    2003-07-01|                 2004|           false|
+----+---------+----------+---------+---------

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- expires: date (nullable = true)
 |-- has_chip: boolean (nullable = true)
 |-- num_cards_issued: integer (nullable = true)
 |-- credit_limit: string (nullable = true)
 |-- acct_open_date: date (nullable = true)
 |-- year_pin_last_changed: integer (nullable = true)
 |-- card_on_dark_web: boolean (nullable = true)



In [None]:
df_cards = df
df_cards = df.withColumnRenamed("id", "card_id")
save_path = "temp/cards_data"
df_cards.coalesce(1).write.mode('overwrite').option("header", True).option("delimiter", ",").csv(save_path)

In [None]:
import shutil

csv_name = [file for file in os.listdir(save_path) if file.endswith('.csv')][0]
final_path = "temp/final_datasets"
if not os.path.exists(final_path):
    os.makedirs(final_path)
shutil.move(f"{save_path}/{csv_name}", f"{final_path}/dim_card.csv")

'temp/final_datasets/dim_card.csv'

# Tratamento do `users_data.csv`

In [None]:
df = spark.read.csv('temp/users_data.csv', header=True, inferSchema=True)

Análise exploratória do dataset

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: string (nullable = true)
 |-- yearly_income: string (nullable = true)
 |-- total_debt: string (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)



In [None]:
df.count()

2000

In [None]:
for col_name in df.columns:
  print(col_name, df.filter(df[col_name].isNull()).count())

id 0
current_age 0
retirement_age 0
birth_year 0
birth_month 0
gender 0
address 0
latitude 0
longitude 0
per_capita_income 0
yearly_income 0
total_debt 0
credit_score 0
num_credit_cards 0


In [None]:
df.select('address').show(10)

+--------------------+
|             address|
+--------------------+
|       462 Rose Lane|
|3606 Federal Boul...|
|     766 Third Drive|
|    3 Madison Street|
|9620 Valley Strea...|
|       58 Birch Lane|
|   5695 Fifth Street|
|   1941 Ninth Street|
|    11 Spruce Avenue|
|    887 Grant Street|
+--------------------+
only showing top 10 rows



Optei por dropar as colunas:
- `address`: Por não conter dados significativos para uma possível análise de
negócio. Caso possuísse dados como cidade, estado e país, seriam dados muito interessantes de serem mantidos e trabalhados.
- `latitude` e `longitude`: Pois, apesar de podermos obter dados significativos a partir deles, geraria um trabalho adicional de conversão dos valores de lat e long em dados de valor.

In [None]:
df = df.drop("latitude")
df = df.drop("longitude")
df = df.drop("address")
df = df.withColumn("per_capita_income", regexp_replace(df["per_capita_income"], "\\$", ""))
df = df.withColumn("yearly_income", regexp_replace(df["yearly_income"], "\\$", ""))
df = df.withColumn("total_debt", regexp_replace(df["total_debt"], "\\$", ""))

In [None]:
df.show(3)

+----+-----------+--------------+----------+-----------+------+-----------------+-------------+----------+------------+----------------+
|  id|current_age|retirement_age|birth_year|birth_month|gender|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+----+-----------+--------------+----------+-----------+------+-----------------+-------------+----------+------------+----------------+
| 825|         53|            66|      1966|         11|Female|            29278|        59696|    127613|         787|               5|
|1746|         53|            68|      1966|         12|Female|            37891|        77254|    191349|         701|               5|
|1718|         81|            67|      1938|         11|Female|            22681|        33483|       196|         698|               5|
+----+-----------+--------------+----------+-----------+------+-----------------+-------------+----------+------------+----------------+
only showing top 3 rows



In [None]:
df_users = df
df_users = df_users.withColumnRenamed("id", "user_id")
save_path = "temp/users_data"
df_users.coalesce(1).write.mode('overwrite').option("header", True).option("delimiter", ",").csv(save_path)

In [None]:
csv_name = [file for file in os.listdir(save_path) if file.endswith('.csv')][0]
final_path = "temp/final_datasets"
if not os.path.exists(final_path):
  os.makedirs(final_path)
shutil.move(f"{save_path}/{csv_name}", f"{final_path}/dim_user.csv")

'temp/final_datasets/dim_user.csv'

# Tratamento do `transactions_data.csv`

In [None]:
df = spark.read.csv("temp/transactions_data.csv", header=True, inferSchema=True)
df.show(3)

+---+-------+-------------------+---------+-------+-------+-----------------+-----------+-------------+--------------+-------+----+------+
|_c0|     id|               date|client_id|card_id| amount|         use_chip|merchant_id|merchant_city|merchant_state|    zip| mcc|errors|
+---+-------+-------------------+---------+-------+-------+-----------------+-----------+-------------+--------------+-------+----+------+
|  0|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00|Swipe Transaction|      59935|       Beulah|            ND|58523.0|5499|  NULL|
|  1|7475328|2010-01-01 00:02:00|      561|   4575| $14.57|Swipe Transaction|      67570|   Bettendorf|            IA|52722.0|5311|  NULL|
|  2|7475329|2010-01-01 00:02:00|     1129|    102| $80.00|Swipe Transaction|      27092|        Vista|            CA|92084.0|4829|  NULL|
+---+-------+-------------------+---------+-------+-------+-----------------+-----------+-------------+--------------+-------+----+------+
only showing top 3 rows



In [None]:
df = df.drop("_c0")

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)



In [None]:
df.count()

20000

In [None]:
for col_name in df.columns:
  print(col_name, df.filter(df[col_name].isNull()).count())

id 0
date 0
client_id 0
card_id 0
amount 0
use_chip 0
merchant_id 0
merchant_city 0
merchant_state 2247
zip 2302
mcc 0
errors 19676


Podemos identificar que existem rows onde a coluna `zip` é nula, mesmo possuindo `merchant_state`

In [None]:
df.filter((df.merchant_state.isNotNull()) & (df.zip.isNull())).show()

+-------+-------------------+---------+-------+-------+-----------------+-----------+---------------+------------------+----+----+------+
|     id|               date|client_id|card_id| amount|         use_chip|merchant_id|  merchant_city|    merchant_state| zip| mcc|errors|
+-------+-------------------+---------+-------+-------+-----------------+-----------+---------------+------------------+----+----+------+
|7476010|2010-01-01 07:53:00|     1579|   3830|  $6.51|Swipe Transaction|      22204|Puerto Vallarta|            Mexico|NULL|5541|  NULL|
|7476549|2010-01-01 10:03:00|      363|   5555|  $8.16|Swipe Transaction|      93391|   Vatican City|      Vatican City|NULL|5812|  NULL|
|7476704|2010-01-01 10:37:00|      363|   5555| $11.66|Swipe Transaction|      93391|   Vatican City|      Vatican City|NULL|5812|  NULL|
|7477375|2010-01-01 12:50:00|     1266|   2478|  $9.77|Swipe Transaction|      22204|    Guadalajara|            Mexico|NULL|5541|  NULL|
|7477534|2010-01-01 13:19:00|     

E que quando a cidade da transação é `"ONLINE"`, o state e zip são nulos.

In [None]:
df.filter((df.merchant_state.isNull())).show()
# df.filter((df.merchant_city == "ONLINE") & (df.merchant_state.isNotNull())).show()

+-------+-------------------+---------+-------+-------+------------------+-----------+-------------+--------------+----+----+------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|merchant_city|merchant_state| zip| mcc|errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+-------------+--------------+----+----+------+
|7475335|2010-01-01 00:14:00|     1684|   2140| $26.46|Online Transaction|      39021|       ONLINE|          NULL|NULL|4784|  NULL|
|7475336|2010-01-01 00:21:00|      335|   5131|$261.58|Online Transaction|      50292|       ONLINE|          NULL|NULL|7801|  NULL|
|7475346|2010-01-01 00:34:00|      394|   4717| $26.04|Online Transaction|      39021|       ONLINE|          NULL|NULL|4784|  NULL|
|7475353|2010-01-01 00:43:00|      301|   3742| $10.17|Online Transaction|      39021|       ONLINE|          NULL|NULL|4784|  NULL|
|7475356|2010-01-01 00:45:00|      566|   3439| $16.86|Online Transac

In [None]:
# df.select("errors").show()
df.filter(df.errors.isNotNull()).count()

324

Em um cenário real, valeria contatar a área de negócios e entender se essas variáveis são dados importantes, mesmo que nulas. Para este exemplo, optei por excluir as linhas de dados nulos na coluna `merchant_state`, priorizando a demonstração de toda a pipeline em funcionamento. Também optei por dropar a coluna `zip`, já que as variáveis `merchant_city` e `merchant_state` cumprem a função de informar localidade. O mesmo para a variável `errors`, pois apresenta quantidade irrisória de informação no panorama geral (apenas ~2% do dataset original)

In [None]:
# df.count() # 20.000 - 2247 (null merchant_state)
df = df.filter((df.merchant_state.isNotNull()))
df = df.drop("zip")
df = df.drop("errors")

In [None]:
df = df.withColumn("amount", regexp_replace(df["amount"], "\\$", ""))
df = df.withColumnRenamed("use_chip", "transaction_type")

In [None]:
df.show(5)

+-------+-------------------+---------+-------+------+-----------------+-----------+-------------+--------------+----+
|     id|               date|client_id|card_id|amount| transaction_type|merchant_id|merchant_city|merchant_state| mcc|
+-------+-------------------+---------+-------+------+-----------------+-----------+-------------+--------------+----+
|7475327|2010-01-01 00:01:00|     1556|   2972|-77.00|Swipe Transaction|      59935|       Beulah|            ND|5499|
|7475328|2010-01-01 00:02:00|      561|   4575| 14.57|Swipe Transaction|      67570|   Bettendorf|            IA|5311|
|7475329|2010-01-01 00:02:00|     1129|    102| 80.00|Swipe Transaction|      27092|        Vista|            CA|4829|
|7475331|2010-01-01 00:05:00|      430|   2860|200.00|Swipe Transaction|      27092|  Crown Point|            IN|4829|
|7475332|2010-01-01 00:06:00|      848|   3915| 46.41|Swipe Transaction|      13051|      Harwood|            MD|5813|
+-------+-------------------+---------+-------+-

Seguindo as regras de normalização, optei por extrair os dados de mercador para uma nova tabela, mantendo na tabela fato apenas os dados à ela necessários. Essa mudança facilitará a modelagem no PowerBI (criação de relacionamentos)

In [None]:
df_merchant = df.select("merchant_id", "merchant_city", "merchant_state", "mcc")
save_path = "temp/merchant_data"
df_merchant.coalesce(1).write.mode('overwrite').option("header", True).option("delimiter", ",").csv(save_path)

In [None]:
csv_name = [file for file in os.listdir(save_path) if file.endswith('.csv')][0]
final_path = "temp/final_datasets"
if not os.path.exists(final_path):
  os.makedirs(final_path)

shutil.move(f"{save_path}/{csv_name}", f"{final_path}/dim_merchant.csv")

'temp/final_datasets/dim_merchant.csv'

In [None]:
df = df.select("id", "date", "client_id", "card_id", "merchant_id", "transaction_type", "amount")
df = df.withColumnRenamed("id", "transaction_id")
df.show(3)

+--------------+-------------------+---------+-------+-----------+-----------------+------+
|transaction_id|               date|client_id|card_id|merchant_id| transaction_type|amount|
+--------------+-------------------+---------+-------+-----------+-----------------+------+
|       7475327|2010-01-01 00:01:00|     1556|   2972|      59935|Swipe Transaction|-77.00|
|       7475328|2010-01-01 00:02:00|      561|   4575|      67570|Swipe Transaction| 14.57|
|       7475329|2010-01-01 00:02:00|     1129|    102|      27092|Swipe Transaction| 80.00|
+--------------+-------------------+---------+-------+-----------+-----------------+------+
only showing top 3 rows



Seguindo o star schema, optei por criar uma tabela dimensão das datas, para facilitar as análises de negócio de forma temporal.

In [None]:
df = df.withColumnRenamed("date", "full_date")
df = df.withColumn("date_id", (monotonically_increasing_id() + 1))
df = df.withColumn("year", year(df.full_date).cast("int"))
df = df.withColumn("month", month(df.full_date).cast("int"))
df = df.withColumn("month_name", date_format(df.full_date, "MMMM"))
df = df.withColumn("day", day(df.full_date).cast("int"))
df = df.withColumn("day_name", date_format(df.full_date, "EEEE"))
df = df.withColumn("day_of_week", dayofweek(df.full_date).cast("int"))
df = df.withColumn("day_of_month", dayofmonth(df.full_date).cast("int"))
df = df.withColumn("day_of_year", dayofyear(df.full_date))
df = df.withColumn("week_of_year", weekofyear(df.full_date))
df = df.withColumn("quarter", quarter(df.full_date).cast("int"))
df = df.withColumn("month_year", date_format(df.full_date, "MMM yyyy"))
df.show()

+--------------+-------------------+---------+-------+-----------+-----------------+------+-------+----+-----+----------+---+--------+-----------+------------+-----------+------------+-------+----------+
|transaction_id|          full_date|client_id|card_id|merchant_id| transaction_type|amount|date_id|year|month|month_name|day|day_name|day_of_week|day_of_month|day_of_year|week_of_year|quarter|month_year|
+--------------+-------------------+---------+-------+-----------+-----------------+------+-------+----+-----+----------+---+--------+-----------+------------+-----------+------------+-------+----------+
|       7475327|2010-01-01 00:01:00|     1556|   2972|      59935|Swipe Transaction|-77.00|      1|2010|    1|   January|  1|  Friday|          6|           1|          1|          53|      1|  Jan 2010|
|       7475328|2010-01-01 00:02:00|      561|   4575|      67570|Swipe Transaction| 14.57|      2|2010|    1|   January|  1|  Friday|          6|           1|          1|          53|

In [None]:
df_date = df.select("date_id", "year", "month", "month_name", "day", "day_name", "day_of_week", "day_of_month", "day_of_year", "week_of_year", "quarter", "month_year")
df_date.show(3)

save_path = "temp/date_data"
df_date.coalesce(1).write.mode('overwrite').option("header", True).option("delimiter", ",").csv(save_path)

+-------+----+-----+----------+---+--------+-----------+------------+-----------+------------+-------+----------+
|date_id|year|month|month_name|day|day_name|day_of_week|day_of_month|day_of_year|week_of_year|quarter|month_year|
+-------+----+-----+----------+---+--------+-----------+------------+-----------+------------+-------+----------+
|      1|2010|    1|   January|  1|  Friday|          6|           1|          1|          53|      1|  Jan 2010|
|      2|2010|    1|   January|  1|  Friday|          6|           1|          1|          53|      1|  Jan 2010|
|      3|2010|    1|   January|  1|  Friday|          6|           1|          1|          53|      1|  Jan 2010|
+-------+----+-----+----------+---+--------+-----------+------------+-----------+------------+-------+----------+
only showing top 3 rows



In [None]:
csv_name = [file for file in os.listdir(save_path) if file.endswith('.csv')][0]
final_path = "temp/final_datasets"
if not os.path.exists(final_path):
  os.makedirs(final_path)

shutil.move(f"{save_path}/{csv_name}", f"{final_path}/dim_date.csv")

'temp/final_datasets/dim_date.csv'

In [None]:
df = df.drop("full_date", "year", "month", "month_name", "day", "day_name", "day_of_week", "day_of_month", "day_of_year", "week_of_year", "quarter", "month_year")
df.show(3)

+--------------+---------+-------+-----------+-----------------+------+-------+
|transaction_id|client_id|card_id|merchant_id| transaction_type|amount|date_id|
+--------------+---------+-------+-----------+-----------------+------+-------+
|       7475327|     1556|   2972|      59935|Swipe Transaction|-77.00|      1|
|       7475328|      561|   4575|      67570|Swipe Transaction| 14.57|      2|
|       7475329|     1129|    102|      27092|Swipe Transaction| 80.00|      3|
+--------------+---------+-------+-----------+-----------------+------+-------+
only showing top 3 rows



Por fim, verifico se houveram casos de registros que contém algum relacionamento nulo

In [None]:
df.alias("t") \
    .join(df_users.alias("u"), df["client_id"] == df_users["user_id"], "left_anti") \
    .count() # 0

0

In [None]:
df.alias("t") \
    .join(df_cards.alias("c"), df["card_id"] == df_cards["card_id"], "left_anti") \
    .count() # 0

0

In [None]:
df.alias("t") \
    .join(df_merchant.alias("m"), df["merchant_id"] == df_merchant["merchant_id"], "left_anti") \
    .count() #0

0

In [None]:
df.alias("t") \
    .join(df_date.alias("d"), df["date_id"] == df_date["date_id"], "left_anti") \
    .count() #0

0

In [None]:
df_transactions = df.drop("client_id")
save_path = "temp/transaction_data"
df.coalesce(1).write.mode('overwrite').option("header", True).option("delimiter", ",").csv(save_path)

csv_name = [file for file in os.listdir(save_path) if file.endswith('.csv')][0]
final_path = "temp/final_datasets"
if not os.path.exists(final_path):
  os.makedirs(final_path)

shutil.move(f"{save_path}/{csv_name}", f"{final_path}/fact_transaction.csv")

'temp/final_datasets/fact_transaction.csv'

## Load to BigQuery

In [None]:
dataset_name = os.getenv("DATASET_NAME")
project_id = os.getenv("PROJECT_ID")
temp_spark_bucket = os.getenv("TEMP_SPARK_BUCKET")

df_list = [df_transactions, df_users, df_merchant, df_cards, df_date]
df_names = ["fact_transaction", "dim_user", "dim_merchant", "dim_card", "dim_date"]
for df, name in zip(df_list, df_names):
  df.write \
  .format("bigquery") \
  .option("temporaryGcsBucket", temp_spark_bucket) \
  .option("table", f"{project_id}.{dataset_name}.{name}") \
  .mode("overwrite") \
  .save()