In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
# Tạo SparkSession
spark = SparkSession.builder \
    .appName("Mini-Project") \
    .config("spark.jars", "file:///D:/OneDrive-QuangNgai/Data_Engineer/Mini-Project/sqljdbc_12.10/enu/jars/mssql-jdbc-12.10.0.jre11.jar") \
    .config("spark.driver.extraClassPath","file:///D:/OneDrive-QuangNgai/Data_Engineer/Mini-Project/sqljdbc_12.10/enu/jars/mssql-jdbc-12.10.0.jre11.jar:file:///D:/OneDrive-QuangNgai/Data_Engineer/Mini-Project/sqljdbc_12.10/enu/auth/x64/mssql-jdbc_auth-12.10.0.x64.dll") \
    .getOrCreate()


In [3]:
# Đọc dữ liệu từ thư mục chứa các tệp CSV
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("mobile_app/")


In [4]:
df.printSchema()

root
 |-- Event Date: integer (nullable = true)
 |-- App Name: string (nullable = true)
 |-- App Apple ID: integer (nullable = true)
 |-- Subscription Name: string (nullable = true)
 |-- Subscription Apple ID: long (nullable = true)
 |-- Subscription Group ID: integer (nullable = true)
 |-- Standard Subscription Duration: string (nullable = true)
 |-- Subscription Offer Name: string (nullable = true)
 |-- Promotional Offer ID: string (nullable = true)
 |-- Subscription Offer Type: string (nullable = true)
 |-- Subscription Offer Duration: string (nullable = true)
 |-- Marketing Opt-In Duration: string (nullable = true)
 |-- Customer Price: double (nullable = true)
 |-- Customer Currency: string (nullable = true)
 |-- Developer Proceeds: double (nullable = true)
 |-- Proceeds Currency: string (nullable = true)
 |-- Preserved Pricing: string (nullable = true)
 |-- Proceeds Reason: string (nullable = true)
 |-- Client: string (nullable = true)
 |-- Device: string (nullable = true)
 |-- Co

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, LongType

# Định nghĩa schema chính xác
correct_schema = StructType([
    StructField("Event Date", IntegerType(), True),  
    StructField("App Name", StringType(), True),
    StructField("App Apple ID", LongType(), True),
    StructField("Subscription Name", StringType(), True),
    StructField("Subscription Apple ID", LongType(), True),
    StructField("Subscription Group ID", LongType(), True),
    StructField("Standard Subscription Duration", StringType(), True),
    StructField("Subscription Offer Name", StringType(), True),
    StructField("Promotional Offer ID", StringType(), True),
    StructField("Subscription Offer Type", StringType(), True),
    StructField("Subscription Offer Duration", StringType(), True),
    StructField("Marketing Opt-In Duration", StringType(), True),
    StructField("Customer Price", DecimalType(12, 2), True),
    StructField("Customer Currency", StringType(), True),
    StructField("Developer Proceeds", DecimalType(12, 2), True),
    StructField("Proceeds Currency", StringType(), True),
    StructField("Preserved Pricing", StringType(), True),
    StructField("Proceeds Reason", StringType(), True),
    StructField("Client", StringType(), True),
    StructField("Device", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Subscriber ID", StringType(), True), 
    StructField("Subscriber ID Reset", StringType(), True),
    StructField("Refund", StringType(), True),
    StructField("Purchase Date", IntegerType(), True), 
    StructField("Units", DecimalType(12, 2), True)
])

# Đọc dữ liệu với schema đã định nghĩa
df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(correct_schema) \
    .load("mobile_app/")

In [6]:
df.printSchema()

root
 |-- Event Date: integer (nullable = true)
 |-- App Name: string (nullable = true)
 |-- App Apple ID: long (nullable = true)
 |-- Subscription Name: string (nullable = true)
 |-- Subscription Apple ID: long (nullable = true)
 |-- Subscription Group ID: long (nullable = true)
 |-- Standard Subscription Duration: string (nullable = true)
 |-- Subscription Offer Name: string (nullable = true)
 |-- Promotional Offer ID: string (nullable = true)
 |-- Subscription Offer Type: string (nullable = true)
 |-- Subscription Offer Duration: string (nullable = true)
 |-- Marketing Opt-In Duration: string (nullable = true)
 |-- Customer Price: decimal(12,2) (nullable = true)
 |-- Customer Currency: string (nullable = true)
 |-- Developer Proceeds: decimal(12,2) (nullable = true)
 |-- Proceeds Currency: string (nullable = true)
 |-- Preserved Pricing: string (nullable = true)
 |-- Proceeds Reason: string (nullable = true)
 |-- Client: string (nullable = true)
 |-- Device: string (nullable = true)

In [7]:
df.show(5, truncate=False)

+----------+------------------------------+------------+-----------------+---------------------+---------------------+------------------------------+-----------------------+--------------------+-----------------------+---------------------------+-------------------------+--------------+-----------------+------------------+-----------------+-----------------+---------------+------+------+-------+---------------+-------------------+------+-------------+-----+
|Event Date|App Name                      |App Apple ID|Subscription Name|Subscription Apple ID|Subscription Group ID|Standard Subscription Duration|Subscription Offer Name|Promotional Offer ID|Subscription Offer Type|Subscription Offer Duration|Marketing Opt-In Duration|Customer Price|Customer Currency|Developer Proceeds|Proceeds Currency|Preserved Pricing|Proceeds Reason|Client|Device|Country|Subscriber ID  |Subscriber ID Reset|Refund|Purchase Date|Units|
+----------+------------------------------+------------+-----------------+--

In [8]:
df.count()

19644

In [9]:
duplicates = df.groupBy(df.columns).count().filter(col("count") > 1)

# Hiển thị các bản ghi duplicate
duplicates.show(truncate=False)

+----------+-----------------------------+------------+-----------------+---------------------+---------------------+------------------------------+-----------------------+--------------------+-----------------------+---------------------------+-------------------------+--------------+-----------------+------------------+-----------------+-----------------+---------------+------+------+-------+-------------+-------------------+------+-------------+-----+-----+
|Event Date|App Name                     |App Apple ID|Subscription Name|Subscription Apple ID|Subscription Group ID|Standard Subscription Duration|Subscription Offer Name|Promotional Offer ID|Subscription Offer Type|Subscription Offer Duration|Marketing Opt-In Duration|Customer Price|Customer Currency|Developer Proceeds|Proceeds Currency|Preserved Pricing|Proceeds Reason|Client|Device|Country|Subscriber ID|Subscriber ID Reset|Refund|Purchase Date|Units|count|
+----------+-----------------------------+------------+---------------

In [10]:
# Loại bỏ các bản ghi duplicate
df = df.dropDuplicates()

In [11]:
df.count()

19641

#### Dim_Date

In [12]:
# Tính toán min và max date trực tiếp từ DataFrame gốc
min_event_date_df = df.select(min(to_date(col("Event Date"), "yyyyMMdd")).alias("min_event_date"))
min_purchase_date_df = df.select(min(to_date(col("Purchase Date"), "yyyyMMdd")).alias("min_purchase_date"))
max_date_df = df.select(max(to_date(col("Event Date"), "yyyyMMdd")).alias("max_date"))

# Lấy giá trị cụ thể
min_event_date = min_event_date_df.first()["min_event_date"]
min_purchase_date = min_purchase_date_df.first()["min_purchase_date"]
max_date = max_date_df.first()["max_date"]

# Tìm ngày nhỏ nhất từ cả hai cột
min_date = min_event_date if min_event_date < min_purchase_date else min_purchase_date

print(f"Min event date: {min_event_date}")
print(f"Min purchase date: {min_purchase_date}")
print(f"Min date (used for dimension): {min_date}")
print(f"Max date: {max_date}")

# Tạo date range
days_diff = (max_date - min_date).days
date_range_df = spark.range(0, days_diff + 1).withColumn(
    "Event Date", 
    expr(f"date_add(cast('{min_date}' as date), cast(id as int))")
).drop("id")

print('Number of dates in range:', date_range_df.count())

Min event date: 2023-09-02
Min purchase date: 2023-05-12
Min date (used for dimension): 2023-05-12
Max date: 2024-01-15
Number of dates in range: 249


In [13]:
# Áp dung các phép biến đổi để tạo ra bảng ngày hoàn chỉnh
complete_dim_date = date_range_df \
    .withColumn("DateKey", date_format("Event Date", "yyyyMMdd").cast("int")) \
    .withColumn("Day", dayofmonth("Event Date")) \
    .withColumn("Month", month("Event Date")) \
    .withColumn("Quarter", quarter("Event Date")) \
    .withColumn("Year", year("Event Date")) \
    .withColumn("DayOfWeek", dayofweek("Event Date")) \
    .withColumn("DayOfWeekName", date_format("Event Date", "EEEE")) \
    .withColumn("MonthName", date_format("Event Date", "MMMM")) \
    .withColumn("WeekOfYear", weekofyear("Event Date")) \
    .withColumn("MonthYear", date_format("Event Date", "yyyy-MM")) \
    .drop("Event Date")

# Đếm số lượng ngày trong khoảng thời gian hoàn chỉnh
print(f"Number of dates in complete range: {complete_dim_date.count()}")

# Hiển thị 5 ngày đầu tiên trong bảng ngày hoàn chỉnh
complete_dim_date.show(5)

Number of dates in complete range: 249
+--------+---+-----+-------+----+---------+-------------+---------+----------+---------+
| DateKey|Day|Month|Quarter|Year|DayOfWeek|DayOfWeekName|MonthName|WeekOfYear|MonthYear|
+--------+---+-----+-------+----+---------+-------------+---------+----------+---------+
|20230512| 12|    5|      2|2023|        6|       Friday|      May|        19|  2023-05|
|20230513| 13|    5|      2|2023|        7|     Saturday|      May|        19|  2023-05|
|20230514| 14|    5|      2|2023|        1|       Sunday|      May|        19|  2023-05|
|20230515| 15|    5|      2|2023|        2|       Monday|      May|        20|  2023-05|
|20230516| 16|    5|      2|2023|        3|      Tuesday|      May|        20|  2023-05|
+--------+---+-----+-------+----+---------+-------------+---------+----------+---------+
only showing top 5 rows



In [14]:
# Kiêm tra các ngày không có trong dữ liệu gốc
df.filter(col("Event Date").isNull()).show(truncate=False)

+----------+------------------------------+------------+-----------------+---------------------+---------------------+------------------------------+-----------------------+--------------------+-----------------------+---------------------------+-------------------------+--------------+-----------------+------------------+-----------------+-----------------+---------------+------+------+-------+---------------+-------------------+------+-------------+-----+
|Event Date|App Name                      |App Apple ID|Subscription Name|Subscription Apple ID|Subscription Group ID|Standard Subscription Duration|Subscription Offer Name|Promotional Offer ID|Subscription Offer Type|Subscription Offer Duration|Marketing Opt-In Duration|Customer Price|Customer Currency|Developer Proceeds|Proceeds Currency|Preserved Pricing|Proceeds Reason|Client|Device|Country|Subscriber ID  |Subscriber ID Reset|Refund|Purchase Date|Units|
+----------+------------------------------+------------+-----------------+--

In [15]:

# Thay thế giá trị NULL trong cột "Event Date" và "Purchase Date" bằng 19000101
df = df.withColumn("Event Date", coalesce(col("Event Date"), lit(19000101)))
df = df.withColumn("Purchase Date", coalesce(col("Purchase Date"), lit(19000101)))


# Xác nhận thay thế thành công
df.filter(col("Event Date") == 19000101).show(5)
df.filter(col("Purchase Date") == 19000101).show(5)

+----------+--------------------+------------+-----------------+---------------------+---------------------+------------------------------+-----------------------+--------------------+-----------------------+---------------------------+-------------------------+--------------+-----------------+------------------+-----------------+-----------------+---------------+------+------+-------+---------------+-------------------+------+-------------+-----+
|Event Date|            App Name|App Apple ID|Subscription Name|Subscription Apple ID|Subscription Group ID|Standard Subscription Duration|Subscription Offer Name|Promotional Offer ID|Subscription Offer Type|Subscription Offer Duration|Marketing Opt-In Duration|Customer Price|Customer Currency|Developer Proceeds|Proceeds Currency|Preserved Pricing|Proceeds Reason|Client|Device|Country|  Subscriber ID|Subscriber ID Reset|Refund|Purchase Date|Units|
+----------+--------------------+------------+-----------------+---------------------+----------

In [16]:
# Thêm bản ghi thể hiện ngày không xác định vào bảng ngày
unknown_date = spark.createDataFrame([
    (19000101, 1, 1, 1, 1900, 1, "Monday", "January", 1, "1900-01")
], ["DateKey", "Day", "Month", "Quarter", "Year", "DayOfWeek", "DayOfWeekName", "MonthName", "WeekOfYear", "MonthYear"])

complete_dim_date = complete_dim_date.union(unknown_date)

In [17]:
# Kiểm tra xem ngày không xác định đã được thêm vào bảng ngày hoàn chỉnh
complete_dim_date.filter(col("DateKey")==19000101).show()

+--------+---+-----+-------+----+---------+-------------+---------+----------+---------+
| DateKey|Day|Month|Quarter|Year|DayOfWeek|DayOfWeekName|MonthName|WeekOfYear|MonthYear|
+--------+---+-----+-------+----+---------+-------------+---------+----------+---------+
|19000101|  1|    1|      1|1900|        1|       Monday|  January|         1|  1900-01|
+--------+---+-----+-------+----+---------+-------------+---------+----------+---------+



#### Dim_App

In [18]:
# Chọn các cột cần thiết cho Dimension App
dim_app = df.select(
    col("App Apple ID").alias("AppAppleID"),
    col("App Name").alias("AppName")
).distinct()

dim_app = dim_app.withColumn("AppName", regexp_replace("AppName", "[^a-zA-Z0-9\\s]", "")).distinct()
dim_app.show(20, truncate=False)

+----------+-----------------------------+
|AppAppleID|AppName                      |
+----------+-----------------------------+
|1597094402|Screen Mirroring SmartTV Cast|
|1644910903|Universal TV Remote Control  |
+----------+-----------------------------+



#### Dim_Subscription

In [19]:
# Chọn các cột cần thiết cho Dimension Subscription
dim_subscription = df.select(
    col("Subscription Apple ID").alias("SubscriptionAppleID"),
    col("Subscription Name").alias("SubscriptionName"),                   
    col("Subscription Group ID").alias("SubscriptionGroupID"),
).distinct()

dim_subscription.show()

+-------------------+-------------------+-------------------+
|SubscriptionAppleID|   SubscriptionName|SubscriptionGroupID|
+-------------------+-------------------+-------------------+
|         6445357999|      Week sale-off|           21100689|
|         1597465785|             Yearly|           20905769|
|         6445194899|        Yearly sale|           21100689|
|         6463114518|        Year Upsale|           20905769|
|         6463854633|    Monthly Upgrade|           21017950|
|         6472164305|  Weekly Black Sale|           21017950|
|         6445194679|        Weekly sale|           21100689|
|         1634176435|        Yearly sale|           20905769|
|         1629807112|     Weekly Premium|           20905769|
|         1644911963|             Yearly|           21017950|
|         1644911334|             Weekly|           21017950|
|         6475703813|           Yearly 2|           20905769|
|         1597465274|            Monthly|           20905769|
|       

#### Dictionary_ExchangeRates

In [20]:
# Xem các giá trị của "Customer Currency"
df.select("Customer Currency").distinct().show()

+-----------------+
|Customer Currency|
+-----------------+
|              DKK|
|              MYR|
|              NZD|
|              HUF|
|              GBP|
|              CHF|
|              BRL|
|              CZK|
|              COP|
|              IDR|
|              TRY|
|              CAD|
|              AED|
|              SAR|
|              EUR|
|              TWD|
|              THB|
|              ZAR|
|              PKR|
|              ILS|
+-----------------+
only showing top 20 rows



In [None]:
import requests
from datetime import datetime
from pyspark.sql import Row

# URL API cung cấp dữ liệu tỷ giá USD
# https://cdn.jsdelivr.net/npm/@fawazahmed0/currency-api@latest/v1/currencies/usd.json
url = "https://cdn.jsdelivr.net/npm/@fawazahmed0/currency-api@2025-06-03/v1/currencies/usd.json"

# Gọi API và lấy dữ liệu JSON
response = requests.get(url)
data = response.json()

# Lấy thông tin ngày từ key "date" (ví dụ: "2025-06-03")
rate_date_str = data.get("date")
rate_date = datetime.strptime(rate_date_str, "%Y-%m-%d").date()  # chuyển thành kiểu date

# Lấy dictionary chứa các tỷ giá từ USD (key: mã tiền, value: tỷ giá)
rates = data.get("usd", {})
exchange_rates_dict = {k.upper(): float(v) for k, v in rates.items() if len(k) == 3}

# Hiển thị dictionary tỷ giá để kiểm tra
print(f"Total currencies in dictionary: {len(exchange_rates_dict)}")
print("Sample exchange rates:", dict(list(exchange_rates_dict.items())[:5]))

'''
# Tạo danh sách các bản ghi, chuyển đổi currency code sang chữ in hoa
rows = [Row(CurrencyCode=k.upper(), RateDate=rate_date, RateToUSD=float(v))
        for k, v in rates.items() if len(k) == 3]

# Tạo DataFrame với cột theo cấu trúc bảng Dim_ExchangeRates
columns = ["CurrencyCode", "RateDate", "RateToUSD"]
dim_exchange_rates = spark.createDataFrame(rows, schema=columns)

# Hiển thị DataFrame
dim_exchange_rates.show(truncate=False)
'''


Total currencies in dictionary: 288
Sample exchange rates: {'ADA': 1.36962165, 'AED': 3.6725, 'AFN': 69.44275238, 'AKT': 0.71146196, 'ALL': 86.09091841}


#### Dim_Country

In [23]:
# Kiểm tra các giá trị NULL trong cột "Country"
df.filter(col("Country").isNull()).show(truncate=False)

+----------+------------------------------+------------+-----------------+---------------------+---------------------+------------------------------+-----------------------+--------------------+-----------------------+---------------------------+-------------------------+--------------+-----------------+------------------+-----------------+-----------------+---------------+------+------+-------+---------------+-------------------+------+-------------+-----+
|Event Date|App Name                      |App Apple ID|Subscription Name|Subscription Apple ID|Subscription Group ID|Standard Subscription Duration|Subscription Offer Name|Promotional Offer ID|Subscription Offer Type|Subscription Offer Duration|Marketing Opt-In Duration|Customer Price|Customer Currency|Developer Proceeds|Proceeds Currency|Preserved Pricing|Proceeds Reason|Client|Device|Country|Subscriber ID  |Subscriber ID Reset|Refund|Purchase Date|Units|
+----------+------------------------------+------------+-----------------+--

In [24]:
# Thay thế các giá trị NULL trong cột "Country" bằng "UNK"
df = df.withColumn("Country", coalesce(col("Country"), lit("UNK")))
df.filter(col("Country") == "UNK").show(5, truncate=False)

+----------+------------------------------+------------+-----------------+---------------------+---------------------+------------------------------+-----------------------+--------------------+-----------------------+---------------------------+-------------------------+--------------+-----------------+------------------+-----------------+-----------------+---------------+------+------+-------+---------------+-------------------+------+-------------+-----+
|Event Date|App Name                      |App Apple ID|Subscription Name|Subscription Apple ID|Subscription Group ID|Standard Subscription Duration|Subscription Offer Name|Promotional Offer ID|Subscription Offer Type|Subscription Offer Duration|Marketing Opt-In Duration|Customer Price|Customer Currency|Developer Proceeds|Proceeds Currency|Preserved Pricing|Proceeds Reason|Client|Device|Country|Subscriber ID  |Subscriber ID Reset|Refund|Purchase Date|Units|
+----------+------------------------------+------------+-----------------+--

In [26]:
import requests
from bs4 import BeautifulSoup
# URL của trang Wikipedia chứa danh sách ISO 3166 country codes
url = 'https://www.iban.com/country-codes'
response = requests.get(url)
if response.status_code == 200:
    soup = BeautifulSoup(response.content, 'html.parser')
    
    table = soup.find('table', id='myTable')
    
    rows = table.find('tbody').find_all('tr')
    
    data = []
    for row in rows:
        cols = row.find_all('td')
        if cols:
            # Cột đầu là CountryName, cột thứ hai là CountryCode (Alpha-2 code)
            country = cols[0].get_text(strip=True)
            alpha2 = cols[1].get_text(strip=True)
            data.append({'CountryName': country, 'CountryCode': alpha2})
    
    dim_country = spark.createDataFrame(data)
    dim_country.show(10, truncate=False)
else:
    print("Không truy cập được vào trang, hãy kiểm tra lại URL hoặc kết nối của bạn.")

+-----------+-------------------+
|CountryCode|CountryName        |
+-----------+-------------------+
|AF         |Afghanistan        |
|AX         |Åland Islands      |
|AL         |Albania            |
|DZ         |Algeria            |
|AS         |American Samoa     |
|AD         |Andorra            |
|AO         |Angola             |
|AI         |Anguilla           |
|AQ         |Antarctica         |
|AG         |Antigua and Barbuda|
+-----------+-------------------+
only showing top 10 rows



In [27]:
# Thêm một bản ghi cho quốc gia không xác định vào Dimension Country
unknown_country = spark.createDataFrame([
    ("UNK", "Unknown Country")
], ["CountryCode", "CountryName"])

# Add the unknown country to your dimension
dim_country_complete = dim_country.union(unknown_country)

In [28]:
# Xác nhận rằng bản ghi "UNK" đã được thêm vào bảng Dim_Country
dim_country_complete.filter(col("CountryCode") == "UNK").show()

+-----------+---------------+
|CountryCode|    CountryName|
+-----------+---------------+
|        UNK|Unknown Country|
+-----------+---------------+



#### Dim_Subscriber

In [29]:
# Kiểm tra các giá trị là rỗng trong cột "Subscriber ID"
df.filter(trim(col("Subscriber ID")) == "").show(truncate=False)

+----------+------------------------------+------------+-----------------+---------------------+---------------------+------------------------------+-----------------------+--------------------+-----------------------+---------------------------+-------------------------+--------------+-----------------+------------------+-----------------+-----------------+---------------+------+------+-------+-------------+-------------------+------+-------------+-----+
|Event Date|App Name                      |App Apple ID|Subscription Name|Subscription Apple ID|Subscription Group ID|Standard Subscription Duration|Subscription Offer Name|Promotional Offer ID|Subscription Offer Type|Subscription Offer Duration|Marketing Opt-In Duration|Customer Price|Customer Currency|Developer Proceeds|Proceeds Currency|Preserved Pricing|Proceeds Reason|Client|Device|Country|Subscriber ID|Subscriber ID Reset|Refund|Purchase Date|Units|
+----------+------------------------------+------------+-----------------+------

In [30]:
# Thay thế các giá trị rỗng trong cột "Subscriber ID" dựa trên cột "Subscriber ID Reset"
df = df.withColumn("Subscriber ID", 
    when((trim(col("Subscriber ID")) == "") & (col("Subscriber ID Reset").isNull()), "UNKNOWN_NULL")
    .when((trim(col("Subscriber ID")) == "") & (col("Subscriber ID Reset") == "Yes"), "UNKNOWN_YES")
    .otherwise(col("Subscriber ID")))

In [31]:
from pyspark.sql.types import StringType, StructType, StructField

schema = StructType([
    StructField("SubscriberID", StringType(), True),
    StructField("SubscriberIDReset", StringType(), True)
])

unknown_subscriber = spark.createDataFrame([
    ("UNKNOWN_NULL", "NULL"),
    ("UNKNOWN_YES", "Yes")
], schema)


In [32]:
dim_subscriber = df.select(
    col("Subscriber ID").alias("SubscriberID"),
    col("Subscriber ID Reset").alias("SubscriberIDReset")
).distinct()

dim_subscriber.show(5)


+---------------+-----------------+
|   SubscriberID|SubscriberIDReset|
+---------------+-----------------+
|210585693248331|              Yes|
|700484934443072|              Yes|
|609605232465315|              Yes|
|238698869259496|              Yes|
|266667262379240|              Yes|
+---------------+-----------------+
only showing top 5 rows



In [33]:
dim_subscriber = dim_subscriber.withColumn("SubscriberIDReset", coalesce(col("SubscriberIDReset"), lit("NULL")))
dim_subscriber_agg = dim_subscriber.groupBy("SubscriberID").agg(
    countDistinct("SubscriberIDReset").alias("distinct_reset_count")
)
# Lọc những subscriber có nhiều hơn 1 giá trị khác nhau ở cột reset
subscriber_diff = dim_subscriber_agg.filter(col("distinct_reset_count") > 1)
subscriber_diff.show(truncate=False)

+---------------+--------------------+
|SubscriberID   |distinct_reset_count|
+---------------+--------------------+
|455577881137507|2                   |
+---------------+--------------------+



In [34]:
df.filter(col("Subscriber ID") == "455577881137507").show(truncate=False)

+----------+-----------------------------+------------+-----------------+---------------------+---------------------+------------------------------+-----------------------+--------------------+-----------------------+---------------------------+-------------------------+--------------+-----------------+------------------+-----------------+-----------------+---------------+------+------+-------+---------------+-------------------+------+-------------+-----+
|Event Date|App Name                     |App Apple ID|Subscription Name|Subscription Apple ID|Subscription Group ID|Standard Subscription Duration|Subscription Offer Name|Promotional Offer ID|Subscription Offer Type|Subscription Offer Duration|Marketing Opt-In Duration|Customer Price|Customer Currency|Developer Proceeds|Proceeds Currency|Preserved Pricing|Proceeds Reason|Client|Device|Country|Subscriber ID  |Subscriber ID Reset|Refund|Purchase Date|Units|
+----------+-----------------------------+------------+-----------------+-----

In [35]:
# Gom nhóm theo SubscriberID và thu thập tập các giá trị SubscriberIDReset (set không trùng lặp)
dim_subscriber_grouped = dim_subscriber.groupBy("SubscriberID") \
    .agg(collect_set("SubscriberIDReset").alias("reset_values"))

# Tạo cột SubscriberIDReset mới: nếu trong tập reset_values có "Yes" thì gán "Yes", ngược lại gán "NULL"
dim_subscriber_final = dim_subscriber_grouped.withColumn(
    "SubscriberIDReset",
    when(array_contains(col("reset_values"), "Yes"), lit("Yes"))
    .otherwise(lit("NULL"))
).drop("reset_values")

dim_subscriber_final.show(truncate=False)

+----------------+-----------------+
|SubscriberID    |SubscriberIDReset|
+----------------+-----------------+
|1442000182691007|NULL             |
|1449000167645592|NULL             |
|1449000169356693|NULL             |
|1449000177180257|NULL             |
|1449000178273951|NULL             |
|1449000179958256|NULL             |
|1449000185583911|NULL             |
|1449000198683851|NULL             |
|1477000180579275|NULL             |
|1519000199096970|NULL             |
|1533001263383305|NULL             |
|1533001408085975|NULL             |
|1533001986516674|NULL             |
|1533002859464127|NULL             |
|154556356688771 |NULL             |
|154557547972823 |NULL             |
|154560619206863 |NULL             |
|154561122997108 |NULL             |
|154561240771912 |NULL             |
|154561516670304 |NULL             |
+----------------+-----------------+
only showing top 20 rows



In [36]:
# Thêm bản ghi cho các Subscriber không xác định vào Dimension Subscriber
dim_subscriber_complete = dim_subscriber_final.union(unknown_subscriber).distinct()

In [37]:
# Xác nhận rằng bản ghi "UNKNOWN_NULL" đã được thêm vào bảng Dim_Subscriber
dim_subscriber_complete.filter(col("SubscriberID") == "UNKNOWN_NULL").show()

+------------+-----------------+
|SubscriberID|SubscriberIDReset|
+------------+-----------------+
|UNKNOWN_NULL|             NULL|
+------------+-----------------+



#### Fact_SubscriptionSales

In [38]:
fact_subscription_sales = df.select(
    # Dimension keys
    col("Event Date").alias("EventDateKey"),
    col("App Apple ID").alias("AppKey"),
    col("Subscription Apple ID").alias("SubscriptionKey"),
    col("Country").alias("CountryKey"),
    col("Subscriber ID").alias("SubscriberKey"),
    col("Purchase Date").alias("PurchaseDateKey"),

    
    # Measures
    col("Customer Price").alias("CustomerPrice"),
    col("Customer Currency").alias("CustomerCurrency"),
    col("Developer Proceeds").alias("DeveloperProceeds"),
    col("Proceeds Currency").alias("ProceedsCurrency"),
    col("Device").alias("Device"),
    col("Refund").alias("Refund"),
    coalesce(col("Units"), lit(1)).alias("Units")
)

# Hiển thị số lượng bản ghi trong bảng fact và 5 bản ghi đầu tiên
print(f"Number of rows in fact table: {fact_subscription_sales.count()}")
fact_subscription_sales.show(5, truncate=False)

Number of rows in fact table: 19641
+------------+----------+---------------+----------+---------------+---------------+-------------+----------------+-----------------+----------------+------+------+-----+
|EventDateKey|AppKey    |SubscriptionKey|CountryKey|SubscriberKey  |PurchaseDateKey|CustomerPrice|CustomerCurrency|DeveloperProceeds|ProceedsCurrency|Device|Refund|Units|
+------------+----------+---------------+----------+---------------+---------------+-------------+----------------+-----------------+----------------+------+------+-----+
|20240113    |1644910903|1644911334     |US        |266667164527997|19000101       |6.99         |USD             |4.90             |USD             |iPhone|NULL  |1.00 |
|20240113    |1644910903|1644911334     |US        |553402396957258|19000101       |0.00         |USD             |0.00             |USD             |iPhone|NULL  |1.00 |
|20240113    |1597094402|1597465785     |US        |602610898912073|19000101       |34.99        |USD        

In [39]:
# Tạo hàm UDF để chuyển đổi giá trị sang USD
def convert_to_usd(value, currency):
    if value is None or currency is None:
        return None
    
    rate = exchange_rates_dict.get(currency, 1.0)  # Mặc định là 1.0 nếu không tìm thấy tỷ giá
    return float(value) * (1.0 / rate)  # Đổi từ loại tiền khác sang USD

# Đăng ký UDF
convert_to_usd_udf = udf(convert_to_usd)

In [40]:
# Thêm cột giá tiền USD
fact_subscription_sales = fact_subscription_sales.withColumn(
    "CustomerPriceUSD", 
    convert_to_usd_udf(col("CustomerPrice"), col("CustomerCurrency"))
)

In [43]:
fact_subscription_sales.show(20, truncate=False)

+------------+----------+---------------+----------+---------------+---------------+-------------+----------------+-----------------+----------------+------+------+-----+-----------------+
|EventDateKey|AppKey    |SubscriptionKey|CountryKey|SubscriberKey  |PurchaseDateKey|CustomerPrice|CustomerCurrency|DeveloperProceeds|ProceedsCurrency|Device|Refund|Units|CustomerPriceUSD |
+------------+----------+---------------+----------+---------------+---------------+-------------+----------------+-----------------+----------------+------+------+-----+-----------------+
|20240113    |1644910903|1644911334     |US        |266667164527997|19000101       |6.99         |USD             |4.90             |USD             |iPhone|NULL  |1.00 |6.99             |
|20240113    |1644910903|1644911334     |US        |553402396957258|19000101       |0.00         |USD             |0.00             |USD             |iPhone|NULL  |1.00 |0.0              |
|20240113    |1597094402|1597465785     |US        |602

In [44]:
# Connection properties - DESKTOP-P25M6CJ
jdbc_url = "jdbc:sqlserver://DESKTOP-P25M6CJ:1433;databaseName=DW_AppleSubscriberReport;integratedSecurity=true;encrypt=true;trustServerCertificate=true"
connection_properties = {
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Hàm để ghi DataFrame vào SQL Server
def write_to_sql_server(df, table_name, mode="append"):
    try:
        df.write.jdbc(
            url=jdbc_url,
            table=table_name,
            mode=mode,
            properties=connection_properties
        )
        print(f"Successfully wrote {df.count()} rows to {table_name}")
    except Exception as e:
        print(f"Error writing to {table_name}: {str(e)}")

# Ghi các bảng dimension vào SQL Server trước
write_to_sql_server(complete_dim_date, "Dim_Date")
write_to_sql_server(dim_app, "Dim_App")
write_to_sql_server(dim_subscription, "Dim_Subscription")
write_to_sql_server(dim_country_complete, "Dim_Country")
write_to_sql_server(dim_subscriber_complete, "Dim_Subscriber")

# Ghi bảng fact vào SQL Server cuối cùng
write_to_sql_server(fact_subscription_sales, "Fact_SubscriptionSales")

Successfully wrote 250 rows to Dim_Date
Successfully wrote 2 rows to Dim_App
Successfully wrote 17 rows to Dim_Subscription
Successfully wrote 250 rows to Dim_Country
Successfully wrote 15417 rows to Dim_Subscriber
Successfully wrote 19641 rows to Fact_SubscriptionSales


In [48]:
df.select(
    col("Event Date").alias("EventDateKey"),
    col("Purchase Date").alias("PurchaseDateKey"),
    col("App Apple ID").alias("AppKey"),
    col("Subscription Apple ID").alias("SubscriptionKey"),
    col("Subscriber ID").alias("SubscriberKey"),
    col("Device").alias("Device")
).distinct().count()

19641

In [3]:
spark.stop()