In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import psycopg2
import os

In [2]:
spark = SparkSession.builder \
        .config("spark.jars.packages", "org.postgresql:postgresql:42.7.0") \
        .master("local") \
        .appName("PySpark_Postgres").getOrCreate()

## Top Contries from where customers come from

## Extract - Transform

In [5]:
df_city = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/data_warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "city") \
    .option("user", "postgres") \
    .option("password", "password.1").load()
df_city.createOrReplaceTempView("city")

In [6]:
df_country = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/data_warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "country") \
    .option("user", "postgres") \
    .option("password", "password.1").load()
df_country.createOrReplaceTempView("country")

In [7]:
df_customer = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/data_warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "customer") \
    .option("user", "postgres") \
    .option("password", "password.1").load()
df_customer.createOrReplaceTempView("customer")

In [8]:
df_address = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5433/data_warehouse") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "address") \
    .option("user", "postgres") \
    .option("password", "password.1").load()
df_address.createOrReplaceTempView("address")

In [11]:
df_result = spark.sql('''
SELECT
    country,
    COUNT(country) as total,
    current_date() as date,
    'suhendar' as data_owner
  FROM customer
  JOIN address ON customer.address_id = address.address_id
  JOIN city ON address.city_id = city.city_id
  JOIN country ON city.country_id = country.country_id
  GROUP BY country
  ORDER BY total DESC
''')

In [12]:
df_result.write.mode('overwrite') \
    .partitionBy('date') \
    .option('compression', 'snappy') \
    .option('partitionOverwriteMode', 'dynamic') \
    .save('data_result_1')

## Load

In [13]:
from sqlalchemy import create_engine
import pandas as pd

df = pd.read_parquet('data_result_1')

engine = create_engine(
    'mysql+mysqlconnector://4FFFhK9fXu6JayE.root:9v07S0pKe4ZYCkjE@gateway01.ap-southeast-1.prod.aws.tidbcloud.com:4000/project3',
    echo=False)
df.to_sql(name='top_country', con=engine, if_exists='append')

217

In [14]:
# API with Digest Authentication

import requests
import pandas as pd
from requests.auth import HTTPDigestAuth

# API endpoint URL
owner = "suhendar"
url = f"https://ap-southeast-1.data.tidbcloud.com/api/v1beta/app/dataapp-DahqjozD/endpoint/test/top_country?owner={owner}"

# Authentication credentials (replace with your actual credentials)
username = "O1QCLVK0"
password = "e886a84e-0b14-4091-b167-afef27a23b04"

# Create a Digest Authentication object
auth = HTTPDigestAuth(username, password)

# Request payload (if any)
payload = {}

# Request headers (if any)
headers = {}

# Make the API request with Digest Authentication
response = requests.request("GET", url, headers=headers, data=payload, auth=auth)

# Convert the response to a Pandas DataFrame (if applicable)
pd.DataFrame(response.json()['data']['rows'])

Unnamed: 0,country,data_owner,date,index,total
0,India,suhendar,2024-12-14,31,60
1,India,suhendar,2024-12-14,31,60
2,India,suhendar,2024-12-15,109,60
3,China,suhendar,2024-12-14,33,53
4,China,suhendar,2024-12-15,110,53
...,...,...,...,...,...
321,Nepal,suhendar,2024-12-15,212,1
322,Bahrain,suhendar,2024-12-15,213,1
323,Hungary,suhendar,2024-12-15,214,1
324,"Virgin Islands, U.S.",suhendar,2024-12-15,215,1
