In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,TimestampType

spark = SparkSession \
    .builder \
    .appName("Spark_Processor") \
    .master("local[*]") \
    .getOrCreate()

schema = StructType([ \
        StructField("DEVICE_CODE", IntegerType(), True), 
        StructField("SYSTEM_ID",IntegerType(),True), \
        StructField("ORIGINE_CAR_KEY",StringType(),True), \
        StructField("FINAL_CAR_KEY",StringType(),True), \
        StructField("CHECK_STATUS_KEY", IntegerType(), True), \
        StructField("COMPANY_ID", StringType(), True), \
        StructField("PASS_DAY_TIME", TimestampType(), True)
    ])

22/02/05 11:40:10 WARN Utils: Your hostname, amin-X556UQK resolves to a loopback address: 127.0.1.1; using 192.168.43.227 instead (on interface wlp3s0)
22/02/05 11:40:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/05 11:40:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.csv('Sample_Traffic.csv',header=True,schema=schema)
df.show(5)

                                                                                

+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|DEVICE_CODE|SYSTEM_ID|ORIGINE_CAR_KEY|FINAL_CAR_KEY|CHECK_STATUS_KEY|COMPANY_ID|      PASS_DAY_TIME|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|     200501|       81|       10477885|     10477885|               5|       161|2021-06-01 03:54:39|
|        155|       81|       87625017|     87625017|               5|       161|2021-06-01 04:14:21|
|     631757|       81|        8652928|      8652928|               5|       161|2021-06-01 03:58:57|
|     631757|       81|        8548123|      8548123|               5|       161|2021-06-01 04:01:38|
|     631757|       81|       24715264|     24715264|               5|       161|2021-06-01 03:56:57|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
only showing top 5 rows



In [45]:
DATA_COUNT = df.count()
CAR_COUNT = df.select('FINAL_CAR_KEY').distinct().count()
CAMERA_COUNT = df.select('DEVICE_CODE').distinct().count()
print('all count: ', DATA_COUNT)
print('car count: ', CAR_COUNT)
print('camera count: ', CAMERA_COUNT)

[Stage 155:>                                                        (0 + 4) / 4]

all count:  4910236
car count:  1520404
camera count:  993


                                                                                

In [48]:
car_counts = df.groupBy('FINAL_CAR_KEY').count()
car_counts.sort('count', ascending=False).show(10)

[Stage 170:>                                                        (0 + 4) / 4]

+-------------+------+
|FINAL_CAR_KEY| count|
+-------------+------+
|     64111706|398901|
|     69177480|  9976|
|      8073331|  3121|
|     28621897|   509|
|     21683458|   311|
|      7633319|   308|
|      9093773|   305|
|      7736465|   234|
|     27397278|   167|
|      8396536|   166|
+-------------+------+
only showing top 10 rows



                                                                                

In [75]:
# traffic_cars = car_counts.filter(car_counts['count'] < 1000).sort('count', ascending=False).select('FINAL_CAR_KEY').take(103)[3:]
traffic_cars = car_counts.filter(car_counts['count'] < 1000).sort('count', ascending=False).select('FINAL_CAR_KEY').take(100)
traffic_cars = [x.FINAL_CAR_KEY for x in traffic_cars]
top_cars_df = df.filter(df.FINAL_CAR_KEY.isin(traffic_cars))
top_cars_df.show(5)

                                                                                

+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|DEVICE_CODE|SYSTEM_ID|ORIGINE_CAR_KEY|FINAL_CAR_KEY|CHECK_STATUS_KEY|COMPANY_ID|      PASS_DAY_TIME|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|        100|       81|       22810018|     22810018|               5|       161|2021-06-01 04:10:54|
|     631763|       81|        7633319|      7633319|               5|       161|2021-06-01 02:28:23|
|     206602|       81|        8614075|      8614075|               5|       161|2021-06-01 02:24:01|
|     101301|       81|        7713151|      7713151|               5|       161|2021-06-01 01:57:46|
|     202101|       81|        7713151|      7713151|               5|       161|2021-06-01 02:21:58|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
only showing top 5 rows



In [76]:
TOP_CARS_COUNT = top_cars_df.count()
TOP_CARS_COUNT

                                                                                

7996

In [77]:
camera_car_df = top_cars_df.groupBy('DEVICE_CODE', 'FINAL_CAR_KEY').count()
camera_car_df.show(5)

[Stage 409:>                                                        (0 + 4) / 4]

+-----------+-------------+-----+
|DEVICE_CODE|FINAL_CAR_KEY|count|
+-----------+-------------+-----+
|     631357|     13565906|    4|
|     900249|     11054045|    3|
|     900124|      7633319|   12|
|        135|     11086409|    1|
|     631829|     11054045|    1|
+-----------+-------------+-----+
only showing top 5 rows



                                                                                

In [78]:
from pyspark.ml.feature import StringIndexer

camera_car_indexed_df = StringIndexer(inputCol='FINAL_CAR_KEY', outputCol='CAR_INDEX').fit(camera_car_df).transform(camera_car_df)
camera_car_indexed_df = StringIndexer(inputCol='DEVICE_CODE', outputCol='CAMERA_INDEX').fit(camera_car_indexed_df).transform(camera_car_indexed_df)

camera_car_indexed_df.show(5)
 



+-----------+-------------+-----+---------+------------+
|DEVICE_CODE|FINAL_CAR_KEY|count|CAR_INDEX|CAMERA_INDEX|
+-----------+-------------+-----+---------+------------+
|     631357|     13565906|    4|     13.0|        49.0|
|     900249|     11054045|    3|     31.0|        18.0|
|     900124|      7633319|   12|      0.0|        69.0|
|        135|     11086409|    1|      3.0|       141.0|
|     631829|     11054045|    1|     31.0|         6.0|
+-----------+-------------+-----+---------+------------+
only showing top 5 rows



                                                                                

In [79]:
camera_car_df.select('DEVICE_CODE').distinct().count()

                                                                                

331

In [80]:
from pyspark.mllib.linalg.distributed import CoordinateMatrix
utility_matrix = CoordinateMatrix(camera_car_indexed_df.rdd.map(lambda x: (int(x['CAR_INDEX']), int(x['CAMERA_INDEX']), x['count']) ))

                                                                                

In [81]:
svd = utility_matrix.toRowMatrix().computeSVD(10, computeU=True)
print(svd.s)

22/02/05 21:30:20 WARN RowMatrix: The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.


[505.0079430191736,448.42958191481915,234.00214006787655,179.17324440605807,164.08997936278325,128.8881130237476,92.03411869027215,85.3951828912464,81.33165440646407,73.68270074109343]


22/02/05 21:30:22 WARN RowMatrix: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.


In [70]:
print(utility_matrix.numCols())
print(utility_matrix.numRows())

380
200


In [None]:
rdd = df.rdd.map(lambda x: ((x['FINAL_CAR_KEY'], x['PASS_DAY_TIME'].date()),  x['DEVICE_CODE']))
rdd.take(5)