In [1]:

import findspark


findspark.init()


In [1]:
from dotenv import load_dotenv
import os

In [2]:
import os
print(os.environ.get('JAVA_HOME'))
print(os.environ.get('SPARK_HOME'))


/usr/lib/jvm/java-8-openjdk
/home/hadoop/spark-3.5.5-bin-hadoop3


In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Read from PostgreSQL") \
    .config("spark.jars", "./jars/postgresql-42.6.0.jar") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .getOrCreate()


25/06/01 21:52:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
jdbc_url = "jdbc:postgresql://localhost:5432/sales_analytics"

table_name = "salesfact"
properties = {
    "user": "your_user",
    "password": "your_password", 
    "driver": "org.postgresql.Driver"
}


In [3]:
try:
    df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=properties)
    df.show()
except Exception as e:
    print(f"Error: {e}")

                                                                                

+---------+----------+-----+-----+----------+----------+
|productid|      date|  zip|units|   revenue|      cogs|
+---------+----------+-----+-----+----------+----------+
|     1003|2010-09-23|91711|    1|$1.038,87 |$1.120,32 |
|     1003|2011-07-24|94614|    1|$1.038,87 |$1.120,32 |
|     1003|2012-06-24|83843|    1|$1.038,87 |$1.120,32 |
|     1003|2011-06-01|36784|    1|$1.038,87 |$1.120,32 |
|     1014|2010-07-11|46360|    1|$1.038,87 |  $951,40 |
|     1014|2011-06-25|21723|    1|$1.038,87 |  $951,40 |
|     1014|2010-11-27|46815|    1|$1.038,87 |  $951,40 |
|     1014|2010-07-21|46058|    1|$1.038,87 |  $951,40 |
|     1014|2010-08-30|28081|    1|$1.038,87 |  $951,40 |
|     1014|2010-07-10|24112|    1|$1.038,87 |  $951,40 |
|     1014|2010-07-21|47630|    1|$1.038,87 |  $951,40 |
|     1014|2012-04-18|46360|    1|$1.038,87 |  $951,40 |
|     1014|2010-07-07|44406|    1|$1.038,87 |  $951,40 |
|     1014|2010-07-26|11967|    1|$1.038,87 |  $951,40 |
|     1014|2010-12-30|49779|   

In [4]:
df.printSchema()

root
 |-- productid: string (nullable = true)
 |-- date: date (nullable = true)
 |-- zip: string (nullable = true)
 |-- units: integer (nullable = true)
 |-- revenue: string (nullable = true)
 |-- cogs: string (nullable = true)



In [5]:
df.describe().show()

25/06/01 21:52:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:>                                                          (0 + 1) / 1]

+-------+------------------+-----------------+------------------+-----------+--------+
|summary|         productid|              zip|             units|    revenue|    cogs|
+-------+------------------+-----------------+------------------+-----------+--------+
|  count|            976243|           976243|            976243|     976202|  976243|
|   mean|1223.3887884471387|55683.27790314502|1.0299792162402188|       NULL|    NULL|
| stddev| 693.3112657737689|  28954.450426042|0.3266693786559363|       NULL|    NULL|
|    min|                 1|            10001|                 1| $1.006,74 |  $0,00 |
|    max|               999|            99950|                44|$98.510,58 |$996,85 |
+-------+------------------+-----------------+------------------+-----------+--------+



                                                                                

**Convert revenue and cogs into correct dtype**

In [6]:
from pyspark.sql.functions import regexp_replace, col
df = df.withColumn("revenue", regexp_replace("revenue", r"[\$.]", "")) \
               .withColumn("revenue", regexp_replace("revenue", ",", ".")) \
               .withColumn("revenue", col("revenue").cast("double")) \
               .withColumn("cogs", regexp_replace("cogs", r"[\$.]", "")) \
               .withColumn("cogs", regexp_replace("cogs", ",", ".")) \
               .withColumn("cogs", col("cogs").cast("double"))
df.printSchema()

root
 |-- productid: string (nullable = true)
 |-- date: date (nullable = true)
 |-- zip: string (nullable = true)
 |-- units: integer (nullable = true)
 |-- revenue: double (nullable = true)
 |-- cogs: double (nullable = true)



**Checking for duplicates**

In [7]:
total_count = df.count()
distinct_count = df.distinct().count()
if total_count != distinct_count:
    print(total_count - distinct_count)
else:
    print("The are no duplicates")

[Stage 9:===>                                                     (1 + 15) / 16]

38


                                                                                

In [8]:
df = df.dropDuplicates()
df.show()

[Stage 13:>                                                         (0 + 1) / 1]

+---------+----------+-----+-----+-------+-------+
|productid|      date|  zip|units|revenue|   cogs|
+---------+----------+-----+-----+-------+-------+
|     1722|2019-03-03|21874|    1|1038.87| 532.94|
|      582|2022-03-22|76210|    1|4283.37|3193.25|
|     1397|2018-11-14|94551|    1|4283.37|4385.31|
|     2071|2011-06-17|32407|    1|4283.37|2756.78|
|     2071|2011-07-01|98684|    1|4283.37|2756.78|
|      993|2017-01-30|48455|    1|4283.37|3750.52|
|      993|2016-08-08|17601|    1|4283.37|3750.52|
|     2354|2013-12-28|91387|    1|4283.37| 4626.9|
|     2214|2014-05-11|48144|    1|4283.37|4667.59|
|     2334|2015-10-13|78140|    1|4283.37|2657.83|
|     2334|2015-05-23|97116|    1|4283.37|2657.83|
|     2334|2014-04-28|31407|    1|4283.37|2657.83|
|     2334|2014-09-18|75501|    1|4283.37|2657.83|
|     1115|2014-02-20|16433|    1|4283.37|4449.99|
|     1115|2015-09-11|98023|    1|4283.37|4449.99|
|     1115|2014-02-15|53168|    1|4283.37|4449.99|
|     1215|2011-04-03|98204|   

                                                                                

**Find count for empty, None, Null, Nan with string literals.**

In [9]:
from pyspark.sql.functions import col, isnan, when, count

df.select([
    count(
        when(
            (col(c).contains('None')) |
            (col(c).contains('NULL')) |
            (col(c) == '') |
            (col(c).isNull()),
            c
        )
    ).alias(c)
    for c in df.columns
]).show()




+---------+----+---+-----+-------+----+
|productid|date|zip|units|revenue|cogs|
+---------+----+---+-----+-------+----+
|        0|   0|  0|    0|     41|   0|
+---------+----+---+-----+-------+----+



                                                                                

**Fill null values in Revenue with the closest date's value for the same product ID.**

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, datediff, row_number, abs
from pyspark.sql.window import Window

In [11]:
df_null = df.filter(F.col("revenue").isNull())
df_no_null = df.filter(F.col("revenue").isNotNull())

In [12]:
joined_df = df_null.alias("a").join(
    df_no_null.alias("b"),
    on=F.col("a.productid") == F.col("b.productid"),
    how="inner"
)
joined_df.show()

[Stage 23:>                                                         (0 + 1) / 1]

+---------+----------+-----+-----+-------+----+---------+----------+-----+-----+-------+-------+
|productid|      date|  zip|units|revenue|cogs|productid|      date|  zip|units|revenue|   cogs|
+---------+----------+-----+-----+-------+----+---------+----------+-----+-----+-------+-------+
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2021-09-10|64086|    1| 3269.7|3370.08|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2020-10-30|62284|    1| 3269.7|3370.08|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2022-05-10|99320|    1| 3269.7|3370.08|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2020-05-19|62264|    1| 3206.7|3305.15|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2020-03-18|98188|    1| 3206.7|3305.15|
|     1176|2022-06-20|33056|    1|   NULL| 0.0|     1176|2021-11-29|91766|    1|8000.37|7529.95|
|     1176|2022-06-20|33056|    1|   NULL| 0.0|     1176|2021-07-24|92223|    1|8000.37|7529.95|
|     2197|2022-07-01| 6379|  

                                                                                

In [13]:
#Tính độ chênh lệch ngày (diff_date)
joined_df = joined_df.withColumn(
    "diff_date",
    abs(datediff(F.col("a.date"), F.col("b.date")))
)

joined_df.show()

[Stage 29:>                                                         (0 + 1) / 1]

+---------+----------+-----+-----+-------+----+---------+----------+-----+-----+-------+-------+---------+
|productid|      date|  zip|units|revenue|cogs|productid|      date|  zip|units|revenue|   cogs|diff_date|
+---------+----------+-----+-----+-------+----+---------+----------+-----+-----+-------+-------+---------+
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2021-09-10|64086|    1| 3269.7|3370.08|      294|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2020-10-30|62284|    1| 3269.7|3370.08|      609|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2022-05-10|99320|    1| 3269.7|3370.08|       52|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2020-05-19|62264|    1| 3206.7|3305.15|      773|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2020-03-18|98188|    1| 3206.7|3305.15|      835|
|     1176|2022-06-20|33056|    1|   NULL| 0.0|     1176|2021-11-29|91766|    1|8000.37|7529.95|      203|
|     1176|2022-06-20|33056|    1|   

                                                                                

In [14]:
#Chọn dòng có diff_date nhỏ nhất cho mỗi dòng df_null
window_spec = Window.partitionBy("a.productid", "a.date").orderBy("diff_date")

joined_df = joined_df.withColumn(
    "row_num",
    row_number().over(window_spec)
).filter(F.col("row_num") == 1)

joined_df.show()

[Stage 39:>                                                       (0 + 16) / 16]

+---------+----------+-----+-----+-------+----+---------+----------+-----+-----+-------+-------+---------+-------+
|productid|      date|  zip|units|revenue|cogs|productid|      date|  zip|units|revenue|   cogs|diff_date|row_num|
+---------+----------+-----+-----+-------+----+---------+----------+-----+-----+-------+-------+---------+-------+
|     1176|2022-06-20|33056|    1|   NULL| 0.0|     1176|2022-06-21|73072|    1|7370.37|6936.99|        1|      1|
|     1541|2022-06-17|92086|    1|   NULL| 0.0|     1541|2022-02-10|92562|    1|5543.37| 4659.2|      127|      1|
|     1542|2022-06-17|92086|    1|   NULL| 0.0|     1542|2022-02-10|92562|    1|5543.37|4892.02|      127|      1|
|     2197|2022-07-01| 6379|    1|   NULL| 0.0|     2197|2022-07-01|98841|    1| 3269.7|3370.08|        0|      1|
|      719|2022-06-14|29063|    1|   NULL| 0.0|      719|2022-06-14|19320|    1|1259.37| 981.55|        0|      1|
|      720|2022-06-14|29063|    1|   NULL| 0.0|      720|2022-06-14|19320|    1|

                                                                                

In [17]:
# Lấy các cột cần thiết từ joined_df
joined_final = joined_df.select(
    F.col("a.productid").alias("productid"),
    F.col("a.date").alias("date"),
    F.col("a.zip"),
    F.col("a.units"),
    F.col("b.revenue").alias("revenue"),  # giá trị revenue thay thế
    F.col("b.cogs")
)
joined_final.show()

[Stage 44:>                                                         (0 + 1) / 1]

+---------+----------+-----+-----+-------+-------+
|productid|      date|  zip|units|revenue|   cogs|
+---------+----------+-----+-----+-------+-------+
|     1176|2022-06-20|33056|    1|7370.37|6936.99|
|     1541|2022-06-17|92086|    1|5543.37| 4659.2|
|     1542|2022-06-17|92086|    1|5543.37|4892.02|
|     2197|2022-07-01| 6379|    1| 3269.7|3370.08|
|      719|2022-06-14|29063|    1|1259.37| 981.55|
|      720|2022-06-14|29063|    1|1259.37| 742.78|
+---------+----------+-----+-----+-------+-------+



                                                                                

In [18]:
joined_final.printSchema()

root
 |-- productid: string (nullable = true)
 |-- date: date (nullable = true)
 |-- zip: string (nullable = true)
 |-- units: integer (nullable = true)
 |-- revenue: double (nullable = true)
 |-- cogs: double (nullable = true)



In [15]:
df_no_null.printSchema()

root
 |-- productid: string (nullable = true)
 |-- date: date (nullable = true)
 |-- zip: string (nullable = true)
 |-- units: integer (nullable = true)
 |-- revenue: double (nullable = true)
 |-- cogs: double (nullable = true)



In [19]:
df_no_null.describe().show()

[Stage 54:>                                                       (0 + 16) / 16]

+-------+------------------+-----------------+-------------------+-----------------+-----------------+
|summary|         productid|              zip|              units|          revenue|             cogs|
+-------+------------------+-----------------+-------------------+-----------------+-----------------+
|  count|            976164|           976164|             976164|           976164|           976164|
|   mean|1223.3825115451912|55682.32204424666|  1.029981642428936|5837.510154679869|4825.150179621349|
| stddev| 693.3109458317431|28954.12198581438|0.32668248558592394|4659.671349878866|4134.226111950511|
|    min|                 1|            10001|                  1|           427.77|           214.53|
|    max|               999|            99950|                 44|         334548.9|        312271.08|
+-------+------------------+-----------------+-------------------+-----------------+-----------------+



                                                                                

In [20]:
final_df = df_no_null.union(joined_final)

In [None]:
final_df.describe().show()

[Stage 68:>                                                       (0 + 17) / 17]

+-------+------------------+------------------+------------------+------------------+-----------------+
|summary|         productid|               zip|             units|           revenue|             cogs|
+-------+------------------+------------------+------------------+------------------+-----------------+
|  count|            976170|            976170|            976170|            976170|           976170|
|   mean|1223.3830797914297|55682.268405093375|1.0299814581476587| 5837.499112022414|4825.142631467772|
| stddev| 693.3100417742106|28954.157018734848| 0.326681490067531|4659.6626395206895|4134.218120412356|
|    min|                 1|             10001|                 1|            427.77|           214.53|
|    max|               999|             99950|                44|          334548.9|        312271.08|
+-------+------------------+------------------+------------------+------------------+-----------------+



                                                                                

25/06/02 00:51:58 WARN NettyRpcEnv: Ignored failure: java.util.concurrent.TimeoutException: Cannot receive any reply from archlinux:40107 in 10000 milliseconds
