In [1]:
!pip install polars pyspark

# Project file structure
# .
# ├── README.txt
# ├── clean_data.ipynb
# ├── clean_data.py
# └── data
#     ├── output
#     ├── raw
#     │ └── log20170201.csv
#     └── staging
#         └── clean_data.csv



In [2]:
import polars as pl

# Load data in polars datframe
po = pl.read_csv('data/raw/log20170201.csv')

# Create datetime column from date and time
po = po.with_columns(
    pl.concat_str(
        [pl.col("date"), pl.col("time")]
    , separator=" ",
    )
    .str.to_datetime()
    .alias("datetime"),
)
# Rename extention to extension
po = po.rename({"extention": "extension"})

# Reorder columns and remove previous date and time columns, and columns unused in further analysis
po = po.select([
     "ip",
     "datetime",
     'extension',
     'size',
])
# Save data in staging
po.write_csv("data/staging/clean_data.csv")

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn

# Init spark session and load CSV data into spark dataframe
spark = (
    SparkSession
    .builder
    .appName("Pyspark Session")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size","10g")
    .getOrCreate()
)
df = spark.read.csv("data/staging/clean_data.csv", header=True)

24/04/10 17:28:51 WARN Utils: Your hostname, Electric-Red-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.87.217.204 instead (on interface en0)
24/04/10 17:28:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/10 17:28:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/10 17:28:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/04/10 17:28:52 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
# Convert datetime to timestamp
df_timestamp = df.withColumn("timestamp", fn.to_timestamp("datetime", "yyyy-MM-dd HH:mm:ss"))

# Sessionize by 30 minute window for each ip address
# Also Aggregate filesize and file extension
sessions = (
    df_timestamp
    .withWatermark("datetime", "30 minutes")
    .groupBy("ip", fn.session_window("datetime","30 minutes"))
    .agg(fn.sum("size").alias("total_filesize"), fn.count("extension").alias
("total_downloads"))
    .alias("num_connections_in_session")
)
sessions = sessions.withColumn("session_id", fn.monotonically_increasing_id())
sessions = sessions.select(
    "session_id",
    "ip",
    "session_window",
    "total_filesize",
    "total_downloads")
sessions.show(truncate = False)



+----------+-------------+------------------------------------------+--------------+---------------+
|session_id|ip           |session_window                            |total_filesize|total_downloads|
+----------+-------------+------------------------------------------+--------------+---------------+
|0         |1.115.247.aag|{2017-02-01 15:27:39, 2017-02-01 15:57:39}|96193.0       |1              |
|1         |1.123.152.eaj|{2017-02-01 23:57:32, 2017-02-02 00:27:32}|5173701.0     |1              |
|2         |1.123.153.bgg|{2017-02-01 23:06:36, 2017-02-01 23:36:36}|94842.0       |1              |
|3         |1.124.48.jhf |{2017-02-01 09:11:54, 2017-02-01 09:41:54}|4117.0        |1              |
|4         |1.124.48.jie |{2017-02-01 01:43:07, 2017-02-01 02:13:07}|3522.0        |1              |
|5         |1.128.97.abf |{2017-02-01 22:37:42, 2017-02-01 23:08:58}|436842.0      |2              |
|6         |1.136.96.bbi |{2017-02-01 13:57:37, 2017-02-01 14:27:37}|10610.0       |1      

                                                                                

In [5]:
# top 10 sessions by total size of downloaded documents
sessions.sort(fn.col("total_filesize").desc()).show(10)

24/04/10 17:29:07 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

+-----------+---------------+--------------------+----------------+---------------+
| session_id|             ip|      session_window|  total_filesize|total_downloads|
+-----------+---------------+--------------------+----------------+---------------+
|60129545457|130.238.165.jda|{2017-02-01 17:33...|2.32727971591E11|          35856|
|51539621692| 54.160.235.hji|{2017-02-01 00:00...|1.99631949425E11|          63338|
|42949684684| 205.156.84.cef|{2017-02-01 00:29...|1.13748288293E11|          12572|
|60129545186| 128.138.64.edf|{2017-02-01 00:00...| 9.1248945065E10|         200912|
|34359753386|   54.69.84.iji|{2017-02-01 09:30...| 8.7162904211E10|         107058|
|      13182|  54.84.100.jga|{2017-02-01 14:01...| 5.7959667905E10|         125945|
|25769818479|    52.8.47.hbf|{2017-02-01 19:00...| 4.9454094002E10|          51093|
|      13181|  54.84.100.jga|{2017-02-01 03:56...| 4.6276299204E10|          77353|
|25769805137| 108.39.205.jga|{2017-02-01 01:31...| 4.3127442896E10|         

                                                                                

In [6]:
# top 10 sessions by total number of downloaded documents
sessions.sort(fn.col("total_downloads").desc()).show(10)



+-----------+---------------+--------------------+---------------+---------------+
| session_id|             ip|      session_window| total_filesize|total_downloads|
+-----------+---------------+--------------------+---------------+---------------+
|17179871924|130.101.154.hhj|{2017-02-01 00:00...|2.1910193809E10|        2214095|
|17179881439|  54.152.17.ccg|{2017-02-01 00:00...|   2.84386158E9|        1244430|
|25769821628| 68.180.231.abf|{2017-02-01 00:00...|  4.984942651E9|         706111|
|17179878659|217.174.255.dgd|{2017-02-01 00:01...|3.6495901717E10|         574257|
|34359741842| 138.19.163.aca|{2017-02-01 14:56...|    3.1456605E9|         438989|
| 8589952514| 72.234.116.hbh|{2017-02-01 00:00...|  7.375972345E9|         337605|
|60129545417|  13.93.154.hjh|{2017-02-01 03:50...|    6.7317976E8|         337070|
|51539609062|  108.91.91.hbc|{2017-02-01 11:01...|  5.105233169E9|         312716|
|68719477917|  52.119.57.ajg|{2017-02-01 00:00...|  1.487874837E9|         294287|
|601

                                                                                