In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("จาก raw data สู่ Disk").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "3000m").\
        config("spark.executor.cores", "2").\
        config("spark.cores.max", "6").\
        master("spark://spark-master:7077").config('spark.jars.packages', 'com.microsoft.azure:spark-mssql-connector:1.0.2').\
        getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.microsoft.azure#spark-mssql-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-49012082-b132-49f0-a730-ba732c7e7847;1.0
	confs: [default]
	found com.microsoft.azure#spark-mssql-connector;1.0.2 in central
	found com.microsoft.sqlserver#mssql-jdbc;8.4.1.jre8 in central
:: resolution report :: resolve 109ms :: artifacts dl 2ms
	:: modules in use:
	com.microsoft.azure#spark-mssql-connector;1.0.2 from central in [default]
	com.microsoft.sqlserver#mssql-jdbc;8.4.1.jre8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwn

In [2]:
# Read the access log file
raw_df = spark.read.text("access.log").withColumnRenamed('value','log_text').repartition(60)

In [3]:
! wc -l access.log

10365152 access.log


In [4]:
raw_df.count()

                                                                                

10365152

In [5]:
raw_df.rdd.getNumPartitions()

60

In [6]:
### หลังจากบรรทัดนี้เป็นฝีมือ ChatGPT-3

In [7]:
from pyspark.sql.functions import regexp_extract,col,monotonically_increasing_id, when, udf, regexp_replace

In [8]:
# Extract feature columns using regular expressions
log_df = raw_df.withColumn("ip", regexp_extract(col("log_text"), "^([\\d.]+)", 1)) \
    .withColumn("request_type", regexp_extract("log_text", r"\"(.*?)\"", 1)) \
.withColumn("status", regexp_extract("log_text", r"\"\s+(\d+)", 1))\
.withColumn("size", regexp_extract("log_text", r"\"\s+\d+\s+(\d+)", 1))\
    .withColumn("timestamp", regexp_extract(col("log_text"), r'\[(.*?)\+', 1)) \
    .withColumn("timezone", regexp_extract(col("log_text"), "\\[.+?\\s(.+?)\\]", 1))\
.withColumn("agent", regexp_extract(col("log_text"), r"\"Mozilla\/(.*?)\"", 1))\
.withColumn("OS", regexp_extract(col("log_text"), "(Windows|Linux|MacOS|iOS|Android)", 1))


In [9]:
log_df.printSchema()

root
 |-- log_text: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- request_type: string (nullable = true)
 |-- status: string (nullable = true)
 |-- size: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- agent: string (nullable = true)
 |-- OS: string (nullable = true)



In [10]:
from pyspark.sql.types import *

### สร้าง FACT

In [11]:
# split dataframe into 2 dataframes
fact_df = log_df.select("ip", "size", "status","agent","OS","timestamp")\
                .withColumn('size',col('size').cast(FloatType()))\
                .withColumn('status',col('status').cast(IntegerType())).dropna()

### สร้าง DIM_STATUS

In [12]:
status_dim_df = log_df.select("status").withColumn('status',col('status').cast(IntegerType())).dropna()

In [13]:
fact_df.printSchema()

root
 |-- ip: string (nullable = true)
 |-- size: float (nullable = true)
 |-- status: integer (nullable = true)
 |-- agent: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [14]:
status_dim_df.printSchema()

root
 |-- status: integer (nullable = true)



In [15]:
status_dim_df.count()

                                                                                

10365152

In [16]:
fact_df.count()

                                                                                

10365152

In [17]:
status_dim_df.describe().show()



+-------+------------------+
|summary|            status|
+-------+------------------+
|  count|          10365152|
|   mean|210.14194890726156|
| stddev|39.214242396992645|
|    min|               200|
|    max|               504|
+-------+------------------+



                                                                                

In [18]:
fact_df.describe().show()



+-------+-------------+------------------+------------------+------------------+--------+--------------------+
|summary|           ip|              size|            status|             agent|      OS|           timestamp|
+-------+-------------+------------------+------------------+------------------+--------+--------------------+
|  count|     10365152|          10365152|          10365152|          10365152|10365152|            10365152|
|   mean|         null|12433.109789996326|210.14194890726156| 5.008038585209003|    null|                null|
| stddev|         null| 28126.54453953198| 39.21424239699265|0.0893329807768274|    null|                null|
|    min|1.132.107.223|               0.0|               200|                  |        |22/Jan/2019:03:56...|
|    max|99.99.188.195|         1249490.0|               504|               6.0|     iOS|26/Jan/2019:20:29...|
+-------+-------------+------------------+------------------+------------------+--------+--------------------+



                                                                                

In [19]:
# add primary key and foreign key to the dataframes
new_status_dim_df = status_dim_df.distinct().withColumn("status_id", monotonically_increasing_id())

# join dataframes on foreign key and primary key
fact_status_df = fact_df.join(new_status_dim_df, fact_df.status == new_status_dim_df.status, "inner")\
.select(fact_df["*"], new_status_dim_df["status_id"])

In [20]:
fact_status_df.count()

                                                                                

10365152

In [21]:
fact_status_df.groupBy('status').count().show()



+------+-------+
|status|  count|
+------+-------+
|   206|      3|
|   500|  14266|
|   504|    103|
|   502|    798|
|   301|  67553|
|   400|    586|
|   403|   5634|
|   404| 105011|
|   408|    112|
|   414|     17|
|   200|9579825|
|   304| 340228|
|   499|  50852|
|   302| 199835|
|   405|      6|
|   401|    323|
+------+-------+



                                                                                

In [22]:
new_status_dim_df.show()

                                                                                

+------+-------------+
|status|    status_id|
+------+-------------+
|   206| 369367187456|
|   500| 463856467968|
|   504| 635655159808|
|   502| 730144440320|
|   301| 867583393792|
|   400| 996432412672|
|   403|1030792151040|
|   404|1073741824000|
|   408|1168231104512|
|   414|1314259992576|
|   200|1331439861760|
|   304|1374389534720|
|   499|1477468749824|
|   302|1563368095744|
|   405|1571958030336|
|   401|1709396983808|
+------+-------------+



In [23]:
fact_status_df.show()

                                                                                

+---------------+--------+------+--------------------+-------+--------------------+------------+
|             ip|    size|status|               agent|     OS|           timestamp|   status_id|
+---------------+--------+------+--------------------+-------+--------------------+------------+
| 87.107.218.136|    64.0|   206|4.0 (compatible; ...|Windows|23/Jan/2019:19:09...|369367187456|
| 87.107.218.136|    64.0|   206|4.0 (compatible; ...|Windows|23/Jan/2019:19:09...|369367187456|
| 185.118.137.99|202772.0|   206|5.0 (Windows NT 6...|Windows|26/Jan/2019:15:11...|369367187456|
|    5.217.85.37|    31.0|   500|5.0 (Windows NT 1...|Windows|22/Jan/2019:11:37...|463856467968|
|  2.187.127.247|    31.0|   500|5.0 (Windows NT 6...|Windows|23/Jan/2019:23:24...|463856467968|
|151.239.241.163| 35112.0|   500|5.0 (Windows NT 6...|Windows|26/Jan/2019:12:46...|463856467968|
|151.239.241.163|  1086.0|   500|5.0 (Windows NT 6...|Windows|26/Jan/2019:11:43...|463856467968|
|151.239.241.163| 35137.0|   5

In [24]:
fact_status_df.count()

                                                                                

10365152

In [25]:
##final_fact_status_df = fact_status_df.drop('status')
final_fact_status_df = fact_status_df

In [26]:
#result_join_df = final_fact_status_df.join(new_status_dim_df,on=[final_fact_status_df.status_id == new_status_dim_df.status_id])

### สร้าง DIM_AGENT

In [27]:
# split dataframe into agent Dim
agent_dim_df = log_df.select("agent","OS")\
.withColumn('agent',col('agent').cast(StringType())).dropna()\
.withColumn('OS',col('OS').cast(StringType())).dropna()

In [28]:
agent_dim_df.printSchema()

root
 |-- agent: string (nullable = true)
 |-- OS: string (nullable = true)



In [29]:
agent_dim_df.count()

                                                                                

10365152

In [30]:
agent_dim_df.describe().show()



+-------+-------------------+--------+
|summary|              agent|      OS|
+-------+-------------------+--------+
|  count|           10365152|10365152|
|   mean|  5.008038585209003|    null|
| stddev|0.08933298077682744|    null|
|    min|                   |        |
|    max|                6.0|     iOS|
+-------+-------------------+--------+



                                                                                

In [31]:
# add primary key and foreign key to the dataframes
new_agent_dim_df = agent_dim_df.distinct().withColumn("agent_id", monotonically_increasing_id())

# join dataframes on foreign key and primary key
fact_status_agent_df = final_fact_status_df.join(new_agent_dim_df, final_fact_status_df.agent == new_agent_dim_df.agent, "inner")\
.select(final_fact_status_df["*"], new_agent_dim_df["agent_id"])

In [32]:
fact_status_agent_df.count()

                                                                                

15397947

In [33]:
fact_status_agent_df.groupBy('agent').count().show()



+--------------------+-----+
|               agent|count|
+--------------------+-----+
|5.0 (Android 4.2....|  122|
|5.0 (Android 4.4....|   89|
|5.0 (Android 4.4....|   12|
|5.0 (Android 4.4....| 1287|
|5.0 (Android 7.0;...|   27|
|5.0 (BB10; Touch)...|    1|
|5.0 (Linux; Andro...|  249|
|5.0 (Linux; Andro...|    2|
|5.0 (Linux; Andro...|   19|
|5.0 (Linux; Andro...|   95|
|5.0 (Linux; Andro...|    1|
|5.0 (Linux; Andro...|   31|
|5.0 (Linux; Andro...|   34|
|5.0 (Linux; Andro...|    2|
|5.0 (Linux; Andro...|    1|
|5.0 (Linux; Andro...|   77|
|5.0 (Linux; Andro...|    1|
|5.0 (Linux; Andro...|   41|
|5.0 (Linux; Andro...|   37|
|5.0 (Linux; Andro...|   37|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [34]:
new_agent_dim_df.show()



+--------------------+-------+--------+
|               agent|     OS|agent_id|
+--------------------+-------+--------+
|5.0 (Linux; Andro...|  Linux|       0|
|5.0 (Linux; Andro...|  Linux|       1|
|5.0 (Linux; U; An...|  Linux|       2|
|5.0 (Linux; Andro...|  Linux|       3|
|5.0 (X11; Linux x...|  Linux|       4|
|5.0 (Linux; U; An...|  Linux|       5|
|5.0 (Linux; Andro...|  Linux|       6|
|5.0 (Linux; Andro...|  Linux|       7|
|5.0 (Linux; U; An...|  Linux|       8|
|5.0 (Linux; Andro...|  Linux|       9|
|5.0 (Windows NT 1...|Windows|      10|
|5.0 (Linux; Andro...|  Linux|      11|
|5.0 (Linux; Andro...|  Linux|      12|
|5.0 (Linux; Andro...|  Linux|      13|
|5.0 (Linux; Andro...|  Linux|      14|
|5.0 (Linux; Andro...|  Linux|      15|
|5.0 (Windows NT 6...|Windows|      16|
|5.0 (Linux; Andro...|  Linux|      17|
|5.0 (Linux; Andro...|  Linux|      18|
|5.0 (Linux; Andro...|  Linux|      19|
+--------------------+-------+--------+
only showing top 20 rows



                                                                                

In [35]:
fact_status_agent_df.show()



+-------------+-------+------+--------------------+-------+--------------------+-------------+------------+
|           ip|   size|status|               agent|     OS|           timestamp|    status_id|    agent_id|
+-------------+-------+------+--------------------+-------+--------------------+-------------+------------+
|37.137.254.94|    0.0|   302|5.0 (Android 4.2....|Android|24/Jan/2019:16:57...|1563368095744|824633720845|
|37.137.120.56| 4842.0|   200|5.0 (Android 4.2....|Android|23/Jan/2019:18:02...|1331439861760|824633720845|
|37.137.120.56|17046.0|   200|5.0 (Android 4.2....|Android|23/Jan/2019:18:01...|1331439861760|824633720845|
|37.137.120.56| 4859.0|   200|5.0 (Android 4.2....|Android|23/Jan/2019:18:02...|1331439861760|824633720845|
|37.137.120.56| 4336.0|   200|5.0 (Android 4.2....|Android|23/Jan/2019:18:02...|1331439861760|824633720845|
|37.137.120.56|16837.0|   200|5.0 (Android 4.2....|Android|23/Jan/2019:18:01...|1331439861760|824633720845|
|37.137.120.56|17009.0|   20

                                                                                

In [36]:
fact_status_agent_df.count()

                                                                                

15397947

In [37]:
##final_fact_status_agent_df = fact_status_agent_df.drop('agent')
final_fact_status_agent_df = fact_status_agent_df

In [38]:
final_fact_status_agent_df

DataFrame[ip: string, size: float, status: int, agent: string, OS: string, timestamp: string, status_id: bigint, agent_id: bigint]

In [39]:
#result_join_df.show()

In [40]:
#result_join_df.describe().toPandas().transpose()

### สร้าง DIM_TIME

In [41]:
from pyspark.sql import functions as sparkf

In [42]:
# split dataframe into agent Dim
time_dim_df = log_df.select("timestamp","timezone")\
.withColumn("new_timestamp",regexp_replace(regexp_replace(col('timestamp'),"(.*)(:)(.*:.*:\S*)",'$1 $3'),"/","-"))\
.withColumn("new_timestamp",sparkf.to_timestamp("new_timestamp","dd-MMM-yyyy HH:mm:ss ")).dropna()

In [43]:
time_dim_df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- new_timestamp: timestamp (nullable = true)



In [44]:
time_dim_df.select("timestamp","timezone","new_timestamp").take(1)

                                                                                

[Row(timestamp='22/Jan/2019:13:40:56 ', timezone='+0330', new_timestamp=datetime.datetime(2019, 1, 22, 13, 40, 56))]

In [45]:
time_dim_df.count()

                                                                                

10365152

In [46]:
time_dim_df

DataFrame[timestamp: string, timezone: string, new_timestamp: timestamp]

In [47]:
# add primary key and foreign key to the dataframes
new_time_dim_df = time_dim_df.distinct().withColumn("time_id", monotonically_increasing_id())

# join dataframes on foreign key and primary key
fact_status_agent_time_df = final_fact_status_agent_df.join(new_time_dim_df, final_fact_status_agent_df.timestamp == new_time_dim_df.timestamp, "inner")\
.select(final_fact_status_agent_df["*"], new_time_dim_df["time_id"])

In [48]:
fact_status_agent_df.count()

                                                                                

15397947

In [49]:
fact_status_agent_time_df.count()

                                                                                

15397947

In [50]:
##final_fact_status_agent_time_df = fact_status_agent_time_df.drop('agent')
final_fact_status_agent_time_df = fact_status_agent_time_df

In [51]:
final_fact_status_agent_time_df

DataFrame[ip: string, size: float, status: int, agent: string, OS: string, timestamp: string, status_id: bigint, agent_id: bigint, time_id: bigint]

In [52]:
result_join_df = final_fact_status_agent_time_df.join(new_agent_dim_df,on=[final_fact_status_agent_time_df.agent_id == new_agent_dim_df.agent_id])\
.join(new_status_dim_df,on=[final_fact_status_agent_time_df.status_id == new_status_dim_df.status_id])\
.join(new_time_dim_df,on=[final_fact_status_agent_time_df.time_id == new_time_dim_df.time_id])

In [53]:
fact_status_agent_time_df.groupBy('timestamp').count()\
.orderBy('count',ascending=False).show()



+--------------------+-----+
|           timestamp|count|
+--------------------+-----+
|25/Jan/2019:05:09...|  779|
|26/Jan/2019:19:07...|  515|
|25/Jan/2019:19:34...|  502|
|25/Jan/2019:19:34...|  459|
|26/Jan/2019:19:05...|  439|
|26/Jan/2019:17:18...|  439|
|26/Jan/2019:16:06...|  432|
|26/Jan/2019:14:49...|  432|
|22/Jan/2019:18:49...|  427|
|23/Jan/2019:17:10...|  424|
|23/Jan/2019:17:22...|  414|
|26/Jan/2019:19:03...|  410|
|24/Jan/2019:16:54...|  403|
|26/Jan/2019:12:59...|  397|
|26/Jan/2019:12:39...|  392|
|26/Jan/2019:14:32...|  388|
|22/Jan/2019:10:05...|  386|
|26/Jan/2019:10:36...|  386|
|24/Jan/2019:12:16...|  384|
|23/Jan/2019:14:14...|  383|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [54]:
result_join_df.count()

                                                                                

15397947

In [55]:
result_join_df.select(final_fact_status_agent_time_df.ip,new_status_dim_df.status,new_agent_dim_df.OS,final_fact_status_agent_time_df.size).show()



+--------------+------+-------+-------+
|            ip|status|     OS|   size|
+--------------+------+-------+-------+
|  195.181.1.90|   302|Windows|    0.0|
|   2.178.8.199|   499|  Linux|    0.0|
|  5.116.29.188|   200|       | 6975.0|
|  5.114.244.90|   200|       | 3294.0|
| 40.77.167.170|   200|  Linux|38995.0|
| 91.98.141.154|   200|  Linux| 2376.0|
| 91.98.141.154|   200|  Linux| 2885.0|
|  31.47.52.122|   200|  Linux|  323.0|
| 91.98.141.154|   200|  Linux|  258.0|
|  93.119.72.10|   200|  Linux|28536.0|
|  93.119.72.10|   200|  Linux|26607.0|
|  93.119.72.10|   200|  Linux| 4120.0|
|85.133.184.242|   200|Windows|10628.0|
|85.133.184.242|   200|Windows| 1214.0|
|85.133.184.242|   200|Windows|46772.0|
|  5.116.29.188|   200|Android| 6975.0|
|  5.114.244.90|   200|Android| 3294.0|
| 91.98.141.154|   200|Windows| 2376.0|
| 91.98.141.154|   200|Windows| 2885.0|
|  31.47.52.122|   200|Windows|  323.0|
+--------------+------+-------+-------+
only showing top 20 rows



                                                                                

In [56]:
result_join_df.printSchema()

root
 |-- ip: string (nullable = true)
 |-- size: float (nullable = true)
 |-- status: integer (nullable = true)
 |-- agent: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- status_id: long (nullable = false)
 |-- agent_id: long (nullable = false)
 |-- time_id: long (nullable = false)
 |-- agent: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- agent_id: long (nullable = false)
 |-- status: integer (nullable = true)
 |-- status_id: long (nullable = false)
 |-- timestamp: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- new_timestamp: timestamp (nullable = true)
 |-- time_id: long (nullable = false)



# MSSQL Connection and Write dataframe into tables

In [57]:
server_name = "jdbc:sqlserver://35.209.118.36"
database_name = "demostarschema"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "Inventory"
username = "SA"
password = "Passw0rd123456" # Please specify password here

In [None]:
final_fact_status_agent_time_df.write.mode('overwrite').format("jdbc")\
        .option("url", url) \
        .option("dbtable", "fact_ip_size") \
        .option("user", username) \
        .option("password", password)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").save()



In [None]:
new.write.mode('overwrite').format("jdbc")\
        .option("url", url) \
        .option("dbtable", "fact_ip_size") \
        .option("user", username) \
        .option("password", password)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").save()