In [10]:
import os  # operating system functions like renaming files and directories
import shutil  # recursive file and directory operations
import glob  # pattern matching for paths
import pandas as pd  # data mangling and transforming
import bandicoot as bc  # MIT toolkit for creating bandicoot indicators
import argparse  # entering flags from the cmd line
import gnuper as gn  # the package in question
from pyspark.sql import SparkSession  # using spark context for big data files
from pyspark.sql.functions import col  # needed for function over each column

In [19]:
mp_flag = True
bc_flag = True
verbose = True
clean_up = False
raw_data_path = '../../CDR/'

In [20]:
# define attributes for this session
att = gn.Attributes(mp_flag=mp_flag,
                    bc_flag=bc_flag,
                    hdfs_flag=hdfs_flag,
                    verbose=verbose,
                    clean_up=clean_up,
                    raw_data_path=raw_data_path,
                    cap_coords=[15.500654, 32.559899],  # capital gps
                    weekend_days=[5, 6],
                    sparkmaster='yarn')

In [21]:
# # --- Part 1 --- (Preprocessing of raw files and saving by user)
spark = SparkSession.builder.master(att.sparkmaster)\
    .appName('cdr_extraction_part1').getOrCreate()
print('Spark environment for Part 1 created!')

# ## antennas datasets
# read cell and antenna locations into a spark dataframe (sdf)
raw_locations = gn.read_as_sdf(file=att.raw_locations,
                               sparksession=spark, header=False,
                                colnames=['cell_id', 'antenna_id',
                                          'longitude', 'latitude'],
                                query=gn.queries.general.raw_locations_query())
# create raw table to query and cache it as we will query it for every day
raw_locations.createOrReplaceTempView('table_raw_locations')
spark.catalog.cacheTable('table_raw_locations')

Spark environment for Part 1 created!


In [22]:
# solely antenna locations as sdf (= ignore cell_id)
# FILE: save as 1 csv next to the raw data as we will need it later on
raw_locations.selectExpr('antenna_id', 'longitude', 'latitude')\
    .dropDuplicates().write.csv(att.antennas_file,
                                mode='overwrite', header=True)
print('Antenna SDF & table created!')

# ## Preprocessing
# **Level 0**: General preprocessing of raw call detail records
# Storing daily files in a unified dataframe
print('Starting with Level 0: General preprocessing of raw CDRs.')

Antenna SDF & table created!
Starting with Level 0: General preprocessing of raw CDRs.
