In [159]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Telecom ETL").getOrCreate()


In [160]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

#### Define schema for the DataFrame

In [161]:

schema = StructType([StructField("imsi",StringType(), False),
                     StructField("imei",StringType(), True),
                     StructField("cell",IntegerType(), False),
                     StructField("lac",IntegerType(), False),
                     StructField("eventType",StringType(), True),
                     StructField("eventTs",TimestampType(), False),
                     ])

#### Read data with specified schema and delimiter

In [162]:
df = spark.read.option("delimiter","|")\
.option("header",'false')\
.csv("ercsn_4g_20200512182929_part02.csv_processing",schema=schema)\
.withColumn("filename",lit("someFileName"))
df.show(truncate=False)

+---------------+---------------+-----+-----+---------+-------------------+------------+
|imsi           |imei           |cell |lac  |eventType|eventTs            |filename    |
+---------------+---------------+-----+-----+---------+-------------------+------------+
|310120265624299|490154203237518|1234 |99   |1        |2020-06-15 07:45:43|someFileName|
|310120265624299|490154203237518|5432 |54   |2        |2020-06-15 12:12:43|someFileName|
|310120265624299|490154203237518|4321 |54   |1        |2020-06-15 15:41:43|someFileName|
|310120265624299|490154203237518|4657 |99   |4        |2020-06-15 19:11:43|someFileName|
|310120265624299|490154203237518|1234 |99   |3        |2020-06-15 20:00:43|someFileName|
|310120265624234|490154203237543|123  |22   |1        |2020-06-15 12:12:43|someFileName|
|310120265624234|490154203237543|456  |21   |1        |2020-06-15 15:31:43|someFileName|
|310120265624234|490154203237543|567  |65   |2        |2020-06-15 17:53:43|someFileName|
|310120265624234|4901

#### Filter rejected records

In [163]:
rejected_df = df.filter((col("imsi").isNull()) | (col("cell").isNull()) | (col("lac").isNull())| (col("eventTs").isNull()))
rejected_df.show()

+------+--------------+-----+-----+---------+-------------------+------------+
|  imsi|          imei| cell|  lac|eventType|            eventTs|    filename|
+------+--------------+-----+-----+---------+-------------------+------------+
|  null|    3214324134|21421|12421|        2|2020-06-15 12:12:43|someFileName|
|214214|12421412421124| null|  124|        1|2020-06-15 12:12:43|someFileName|
|214214|12421412421124|   11| null|        1|2020-06-15 12:12:43|someFileName|
|214214|12421412421124|   11|  444|        1|               null|someFileName|
+------+--------------+-----+-----+---------+-------------------+------------+



#### Derive new columns and filter valid records

In [164]:
new_cols_df = df.withColumn("tac", 
                            when(col("imei").isNull() | (length(col("imei")) < 15), "-99999")
                            .otherwise(substring(col("imei"), 0, 7))) \
                .withColumn("snr", 
                            when(col("imei").isNull() | (length(col("imei")) < 15), "-99999")
                            .otherwise(substring(col("imei"), 8, 13)))\
                .withColumn("event_ts",
                            to_timestamp(col("eventTs"),'YYYY-mm-dd HH:MM:SS'))\
                .filter((col("imsi").isNotNull()) & (col("cell").isNotNull()) & (col("lac").isNotNull()) & (col("eventTs").isNotNull()))

# Drop redundant columns
df_dropped = new_cols_df.drop("eventTs")
df_dropped.show()

+---------------+---------------+----+---+---------+------------+-------+--------+-------------------+
|           imsi|           imei|cell|lac|eventType|    filename|    tac|     snr|           event_ts|
+---------------+---------------+----+---+---------+------------+-------+--------+-------------------+
|310120265624299|490154203237518|1234| 99|        1|someFileName|4901542|03237518|2020-06-15 07:45:43|
|310120265624299|490154203237518|5432| 54|        2|someFileName|4901542|03237518|2020-06-15 12:12:43|
|310120265624299|490154203237518|4321| 54|        1|someFileName|4901542|03237518|2020-06-15 15:41:43|
|310120265624299|490154203237518|4657| 99|        4|someFileName|4901542|03237518|2020-06-15 19:11:43|
|310120265624299|490154203237518|1234| 99|        3|someFileName|4901542|03237518|2020-06-15 20:00:43|
|310120265624234|490154203237543| 123| 22|        1|someFileName|4901542|03237543|2020-06-15 12:12:43|
|310120265624234|490154203237543| 456| 21|        1|someFileName|4901542|

#### Show counts


In [165]:
print(f"Total Number of records: {df.count()}")
print(f"Total Number of Accepted records: {df_dropped.count()}")
print(f"Total Number of rejected records: {rejected_df.count()}")
print(f"Total Number of processed records: {rejected_df.count() + df_dropped.count()}")

Total Number of records: 17
Total Number of Accepted records: 13
Total Number of rejected records: 4
Total Number of processed records: 17


#### Select final columns

In [166]:
final_df = df_dropped.select("imsi","tac","snr","imei","cell","lac","eventType","event_ts","filename")

final_df.show()

+---------------+-------+--------+---------------+----+---+---------+-------------------+------------+
|           imsi|    tac|     snr|           imei|cell|lac|eventType|           event_ts|    filename|
+---------------+-------+--------+---------------+----+---+---------+-------------------+------------+
|310120265624299|4901542|03237518|490154203237518|1234| 99|        1|2020-06-15 07:45:43|someFileName|
|310120265624299|4901542|03237518|490154203237518|5432| 54|        2|2020-06-15 12:12:43|someFileName|
|310120265624299|4901542|03237518|490154203237518|4321| 54|        1|2020-06-15 15:41:43|someFileName|
|310120265624299|4901542|03237518|490154203237518|4657| 99|        4|2020-06-15 19:11:43|someFileName|
|310120265624299|4901542|03237518|490154203237518|1234| 99|        3|2020-06-15 20:00:43|someFileName|
|310120265624234|4901542|03237543|490154203237543| 123| 22|        1|2020-06-15 12:12:43|someFileName|
|310120265624234|4901542|03237543|490154203237543| 456| 21|        1|2020

#### Convert Spark DataFrame to Pandas DataFrame
##### Save the Pandas DataFrame to a CSV file

In [167]:
import pandas as pd

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = final_df.toPandas()

# Save the Pandas DataFrame to a CSV file
pandas_df.to_csv("C:/Users/Ahmed Ashraf/Desktop/Telecom-ETL/Final_data/processed.csv", index=False)

  series = series.astype(t, copy=False)


##### Stop the SparkSession

In [168]:
spark.stop()