In [None]:
import pyspark
from pyspark import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
DATA_PATH = "A:\\PostgreSQL15\\data\\randdata.csv"
SPARK_CONF = {
    "spark.driver.memory": "20g",
    "spark.executor.memory": "20g",
    # "spark.driver.maxResultSize": "7g", 
}

In [3]:
spark = SparkSession.builder.config(
    map = SPARK_CONF
).getOrCreate()

### Raw data

In [4]:
spark_df = spark.read.csv(
    DATA_PATH,
    header = True,
    schema = "id int, randint int, randtext string, randdatetime timestamp",
    nullValue = "null",
)

In [11]:
spark_df.show(11)

+--------+-------+--------------------+-------------------+
|      id|randint|            randtext|       randdatetime|
+--------+-------+--------------------+-------------------+
|19998455|   5571|a01f5d2af3b46c9eb...|2015-11-06 00:14:33|
|19998456|   8337|    febadbbcdffbcdcf|2018-01-15 13:07:55|
|19998457|    585|       edcebdacafbae|2018-05-12 00:46:45|
|19998458|   7508|  cccfceacdedacdcfdd|2019-10-23 10:11:34|
|19998459|   2127|    bdffdaddddeaecdc|2022-01-03 09:05:43|
|    NULL|   NULL|                NULL|               NULL|
|19998461|   8783|    fefbaaafcabcbddf|2020-08-28 12:09:38|
|19998462|   1841|714396d66a0146f7e...|2021-06-10 03:40:21|
|19998463|   7720|          cddaceedfa|2014-05-07 16:44:16|
|19998464|   3627|c1f21043dbdb39f84...|2015-03-21 09:38:16|
|19998465|   7270|   acaaeacbbfbbaeddf|2021-05-15 22:53:05|
+--------+-------+--------------------+-------------------+
only showing top 11 rows



### Processed data

In [5]:
spark_df_processed = spark_df \
.dropna(
    how = "all",
) \
.dropDuplicates() \
.filter(
    (F.hour("randdatetime") < 1) | (F.hour("randdatetime") > 2),
) \
.withColumn(
    "randtext",
    F.regexp_replace("randtext", "^[^0-9]+$", ""),
) \
.cache()

In [6]:
spark_df_processed.count()

61631051

In [10]:
spark_df_processed.show(11)

+--------+-------+--------------------+-------------------+
|      id|randint|            randtext|       randdatetime|
+--------+-------+--------------------+-------------------+
|19998874|   8611|                    |2021-01-09 17:51:35|
|19999248|   8439|8e21034c5b5d15486...|2021-02-10 12:09:47|
|19999426|   8450|de83a92edbad42ce9...|2020-08-23 06:18:21|
|20000296|    556|18a566b9521bfe547...|2013-06-11 03:34:00|
|20000606|   1766|319ec138b82fe9315...|2019-01-04 16:17:31|
|20001146|   1426|                    |2013-05-05 14:11:41|
|20001639|   1957|                    |2018-09-02 04:04:18|
|20001878|   8065|                    |2015-09-25 19:20:09|
|20002086|   2058|                    |2018-11-04 13:54:52|
|20002576|   5801|054be40de042b361d...|2015-06-24 22:29:39|
|20002825|   7900|                    |2020-04-06 07:30:14|
+--------+-------+--------------------+-------------------+
only showing top 11 rows



### Aggregated data

In [7]:
spark_df_aggregated = spark_df_processed \
.groupBy(
    F.hour("randdatetime").alias("randdatetimehour"),
) \
.agg(
    F.median("randint").alias("randintmedian"),
    F.mean("randint").alias("randintmean"),
    F.countDistinct("randtext").alias("randtextdistinct"),
) \
.cache()

# SQL
# spark_df_processed.createOrReplaceTempView("spark_df_processed")
# spark_df_aggregated = spark.sql('''
#     SELECT
#         hour(randdatetime) AS randdatetimehour,
#         median(randint) AS randintmedian,
#         mean(randint) AS randintmean,
#         count(DISTINCT randtext) AS randtextdistinct
#     FROM spark_df_processed 
#     GROUP BY hour(randdatetime)
# ''').cache()

In [8]:
spark_df_aggregated.count()

22

In [9]:
spark_df_aggregated.orderBy("randdatetimehour").show()

+----------------+-------------+------------------+----------------+
|randdatetimehour|randintmedian|       randintmean|randtextdistinct|
+----------------+-------------+------------------+----------------+
|               0|       5001.0| 5000.088049141303|         2592182|
|               3|       5001.0|  5001.37859555672|         2304007|
|               4|       5006.0|  5000.11836798847|         1294408|
|               5|       4998.0|  4996.62453204293|         1295259|
|               6|       5003.0| 5000.952595330189|         1296815|
|               7|       5001.0| 4999.136625470596|         1297993|
|               8|       5000.0|   4999.3634063837|         1295740|
|               9|       4997.0| 4998.745852833511|         1294641|
|              10|       5004.0| 5000.680193187878|         1296752|
|              11|       5000.0| 5001.391688620129|         1295092|
|              12|       4992.0| 4996.467272154099|         1295340|
|              13|       5004.0|50

### Raw data merged with aggregated data

In [12]:
spark_df_merged = spark_df \
.join(
    spark_df_aggregated,
    F.hour((F.round(F.unix_timestamp("randdatetime") / 3600) * 3600).cast("timestamp")) == spark_df_aggregated.randdatetimehour,
    "left",
) \
.cache()

In [13]:
spark_df_merged.count()

100000001

In [14]:
spark_df_merged.show(20)

+--------+-------+--------------------+-------------------+----------------+-------------+------------------+----------------+
|      id|randint|            randtext|       randdatetime|randdatetimehour|randintmedian|       randintmean|randtextdistinct|
+--------+-------+--------------------+-------------------+----------------+-------------+------------------+----------------+
|19998455|   5571|a01f5d2af3b46c9eb...|2015-11-06 00:14:33|               0|       5001.0| 5000.088049141303|         2592182|
|19998456|   8337|    febadbbcdffbcdcf|2018-01-15 13:07:55|              13|       5004.0|5002.6345788950575|         1295300|
|19998457|    585|       edcebdacafbae|2018-05-12 00:46:45|            NULL|         NULL|              NULL|            NULL|
|19998458|   7508|  cccfceacdedacdcfdd|2019-10-23 10:11:34|              10|       5004.0| 5000.680193187878|         1296752|
|19998459|   2127|    bdffdaddddeaecdc|2022-01-03 09:05:43|               9|       4997.0| 4998.745852833511|  

### Entire transformation

In [11]:
# spark_df_merged = spark_df \
# .dropna(
#     how = "all",
# ) \
# .dropDuplicates() \
# .filter(
#     (F.hour("randdatetime") < 1) | (F.hour("randdatetime") > 2),
# ) \
# .withColumn(
#     "randtext",
#     F.regexp_replace("randtext", "^[^0-9]+$", ""),
# ) \
# .groupBy(
#     F.hour("randdatetime").alias("randdatetimehour"),
# ) \
# .agg(
#     F.median("randint").alias("randintmedian"),
#     F.mean("randint").alias("randintmean"),
#     F.countDistinct("randtext").alias("randtextdistinct"),
# ) \
# .alias("spark_df_aggregated") \
# .join(
#     spark_df,
#     F.hour((F.round(F.unix_timestamp("randdatetime")/3600)*3600).cast("timestamp")) == F.col("spark_df_aggregated.randdatetimehour"),
#     "right",
# ) \
# .cache()

### Visualization

#### XY Graph

In [15]:
spark_df_mean_by_month = spark_df \
.groupBy(
    F.month("randdatetime").alias("randdatetimemonth"),
) \
.agg(
    F.mean("randint").alias("randintmean"),
) \
.cache()

In [16]:
spark_df_mean_by_month.orderBy("randdatetimemonth").show()

+-----------------+------------------+
|randdatetimemonth|       randintmean|
+-----------------+------------------+
|             NULL|              NULL|
|                1| 4999.480011806014|
|                2|  4998.17410492495|
|                3| 5001.034707026573|
|                4| 5000.415738663857|
|                5| 5001.790249214541|
|                6|  4999.72198814926|
|                7| 5001.123621269975|
|                8|4998.7655839937415|
|                9| 5000.826370209681|
|               10| 4999.487245038549|
|               11| 5000.208261671315|
|               12| 5001.142630272484|
+-----------------+------------------+



In [17]:
spark_df_mean_by_month.dropna().orderBy("randdatetimemonth").pandas_api().plot(x = "randdatetimemonth", y = "randintmean")

#### Histogram

In [18]:
spark_df.select("randint").pandas_api().plot.hist(bins = 5)

#### Heatmap

In [79]:
spark_df \
.select(
    F.explode(F.split("randtext", "")).alias("randtextchar"),
) \
.groupBy(
    "randtextchar",
) \
.agg(
    F.count("randtextchar").alias("count"),
) \
.pandas_api().style.background_gradient(cmap ='viridis')

Unnamed: 0,randtextchar,count
0,7,89997382
1,3,89991974
2,8,89999067
3,0,89996284
4,f,180013462
5,5,89999482
6,6,89995005
7,e,180013868
8,d,180007076
9,c,180018732
