In [10]:
from pyspark.sql import SparkSession
import pyspark.sql.types as t
import pyspark.sql.functions as f
import datetime as dt
from glob import glob

In [2]:
files = glob('/app/data/*/*.csv')
files

['/app/data/2023-07/2023-07-12 06:00:00.000000 - 2023-07-13 23:59:59.999999.csv',
 '/app/data/2023-07/2023-07-08 18:00:00.000000 - 2023-07-10 11:59:59.999999.csv',
 '/app/data/2023-07/2023-07-07 00:00:00.000000 - 2023-07-08 17:59:59.999999.csv',
 '/app/data/2023-07/2023-07-10 12:00:00.000000 - 2023-07-12 05:59:59.999999.csv',
 '/app/data/2024-07/2024-07-12 06:00:00 - 2024-07-13 23:59:59.999999.csv',
 '/app/data/2024-07/2024-07-10 12:00:00 - 2024-07-12 05:59:59.999999.csv',
 '/app/data/2024-07/2024-07-08 18:00:00 - 2024-07-10 11:59:59.999999.csv',
 '/app/data/2024-07/1.csv',
 '/app/data/2022-07/2022-07-07 00:00:00 - 2022-07-08 17:59:59.999999.csv',
 '/app/data/2022-07/2022-07-08 18:00:00 - 2022-07-10 11:59:59.999999.csv',
 '/app/data/2022-07/2022-07-12 06:00:00 - 2022-07-13 23:59:59.999999.csv',
 '/app/data/2022-07/2022-07-10 12:00:00 - 2022-07-12 05:59:59.999999.csv',
 '/app/data/2021-07/2021-07-08 18:00:00 - 2021-07-10 11:59:59.999999.csv',
 '/app/data/2021-07/2021-07-10 12:00:00 - 20

In [23]:
spark = SparkSession.builder\
    .appName("TestJob")\
    .master("spark://spark-master:7077")\
    .getOrCreate()

In [24]:
schema = t.StructType([
    t.StructField('record_type', t.StringType()),
    t.StructField('type', t.StringType()),
    t.StructField('time', t.DoubleType()),
    t.StructField('project', t.StringType()),
    t.StructField('collector', t.StringType()),
    t.StructField('router', t.StringType()),
    t.StructField('router_ip', t.StringType()),
    t.StructField('peer_asn', t.IntegerType()),
    t.StructField('peer_address', t.StringType()),
    t.StructField('prefix', t.StringType()),
    t.StructField('next_hop', t.StringType()),
    t.StructField('as_path', t.StringType()),
    t.StructField('communities', t.StringType()),
])

df = spark.read.csv('/app/data/2024-07/1.csv', header=True, schema=schema)

In [25]:
df.show(10)

+-----------+----+-------------------+----------+----------------+------+---------+--------+--------------------+-------------------+--------------------+--------------------+--------------------+
|record_type|type|               time|   project|       collector|router|router_ip|peer_asn|        peer_address|             prefix|            next_hop|             as_path|         communities|
+-----------+----+-------------------+----------+----------------+------+---------+--------+--------------------+-------------------+--------------------+--------------------+--------------------+
|     update|   W|1.720310400067833E9|routeviews|route-views.eqix|  NULL|     NULL|   16552|2001:504:0:2:0:1:...|2a06:de05:626b::/48|                NULL|                NULL|                NULL|
|     update|   W|1.720310400067833E9|routeviews|route-views.eqix|  NULL|     NULL|   16552|2001:504:0:2:0:1:...|2a06:de05:62c5::/48|                NULL|                NULL|                NULL|
|     update|  

In [26]:
start_unixtime = dt.datetime(2024, 7, 7).timestamp()
end_unixtime = dt.datetime(2024, 7, 8).timestamp()

In [46]:
df = df.filter((df.time >= start_unixtime) & (df.time < end_unixtime)) \
       .withColumn('datetime', f.from_unixtime('time'))\
       .withColumn('hour', f.date_trunc('hour', 'datetime'))\
       .withColumn('as_path_split', f.split('as_path', ' '))\
       .withColumn(
           'origin_asn', 
            f.get('as_path_split', f.size('as_path_split') - 1).cast(t.IntegerType())
       )\
       .drop('as_path_split')\
       .groupBy('hour')\
       .agg(
           f.countDistinct('peer_asn').alias('unique_peers'),
           f.countDistinct('origin_asn').alias('unique_origins'),
           f.countDistinct('prefix').alias('unique_prefixes'),
           f.count('time').alias('updates')
       )

In [47]:
df.limit(5).toPandas()

                                                                                

Unnamed: 0,hour,count(DISTINCT peer_asn),count(DISTINCT origin_asn),count(DISTINCT peer_asn).1
0,2024-07-07 22:00:00,26,21209,26
1,2024-07-07 04:00:00,26,16025,26
2,2024-07-07 21:00:00,26,26645,26
3,2024-07-07 08:00:00,25,7711,25
4,2024-07-07 01:00:00,27,18881,27


In [9]:
df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------------------+------------------------+
|               hour|count(DISTINCT peer_asn)|
+-------------------+------------------------+
|2024-07-07 00:00:00|                      26|
+-------------------+------------------------+



                                                                                

In [52]:
# spark.stop()