# prepare data

In [1]:
import os
import csv
import uuid
import random
import json

from pyspark import SparkConf, SparkContext

default_nes = [ 'ne_{}'.format(str(uuid.uuid4())) for x in range(10) ]
default_users = [ 'user_{}'.format(str(uuid.uuid4())) for x in range(10)]

fields = [ ['f1','f2'],['f3','f4'],['f5','f6'],['f7','f8']]
    
xdrs = ['xdr1','xdr2','xdr3','xdr4']

all_data = {}

for xdr, field in zip(xdrs,fields):
    xdr_records = []
    for i in range(100):
        row = {}
        row['ne'] = random.choice(default_nes)
        row['user'] = random.choice(default_users)
        row[field[0]] = random.randrange(1,100)
        row[field[1]] = random.randrange(1,100)
        xdr_records.append(row)
    all_data[xdr]= xdr_records
    
for xdr in xdrs:
    with open(f'{xdr}.json','w') as f:
        f.write(json.dumps(all_data[xdr], indent=2))

# initialize spark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("zeropadding").getOrCreate()

# load data into xdr df

In [3]:
dfs = []
for xdr in xdrs:
    df = spark.read.format('json').option("multiLine", True).load(f'{xdr}.json')
    dfs.append(df)

In [4]:
dfs[0].show(5)

+---+---+--------------------+--------------------+
| f1| f2|                  ne|                user|
+---+---+--------------------+--------------------+
| 82| 12|ne_ef2a3cdb-72fa-...|user_b8446a27-51d...|
| 85| 26|ne_af272970-917c-...|user_bd5dfa72-0b1...|
| 93| 29|ne_dc84d9a8-0269-...|user_bd5dfa72-0b1...|
| 11| 74|ne_221903ce-6d3f-...|user_43d99f1a-a04...|
| 69|  3|ne_ef2a3cdb-72fa-...|user_bd5dfa72-0b1...|
+---+---+--------------------+--------------------+
only showing top 5 rows



# zero padding with join

In [5]:
dfa = dfs[0].join(dfs[1], ['ne','user'], how='outer') \
    .join(dfs[2], ['ne','user'], how='outer') \
    .join(dfs[3], ['ne','user'], how='outer') \
    .na.fill(0)

In [6]:
dfa.show()

+--------------------+--------------------+---+---+---+---+---+---+---+---+
|                  ne|                user| f1| f2| f3| f4| f5| f6| f7| f8|
+--------------------+--------------------+---+---+---+---+---+---+---+---+
|ne_b9f47d9b-2d1d-...|user_9c4acb8b-1eb...| 70| 28|  0|  0| 86| 30|  0|  0|
|ne_b9f47d9b-2d1d-...|user_9c4acb8b-1eb...| 70| 28|  0|  0| 78| 74|  0|  0|
|ne_ab326a59-6721-...|user_86d840a9-ad0...| 81|  1| 18| 83| 58| 76| 39| 88|
|ne_ab326a59-6721-...|user_86d840a9-ad0...| 81|  1| 18| 83| 58| 76| 15| 86|
|ne_00816a9d-e371-...|user_bd5dfa72-0b1...|  0|  0| 27| 55| 92| 57| 91| 88|
|ne_00816a9d-e371-...|user_bd5dfa72-0b1...|  0|  0| 27| 55| 92| 57| 76| 36|
|ne_00816a9d-e371-...|user_bd5dfa72-0b1...|  0|  0| 27| 55| 92| 57| 58| 10|
|ne_00816a9d-e371-...|user_bd5dfa72-0b1...|  0|  0| 27| 55| 52| 10| 91| 88|
|ne_00816a9d-e371-...|user_bd5dfa72-0b1...|  0|  0| 27| 55| 52| 10| 76| 36|
|ne_00816a9d-e371-...|user_bd5dfa72-0b1...|  0|  0| 27| 55| 52| 10| 58| 10|
|ne_221903ce

measure aggregation

In [7]:
dfa.groupBy(['ne','user']).sum().show()

+--------------------+--------------------+-------+-------+-------+-------+-------+-------+-------+-------+
|                  ne|                user|sum(f1)|sum(f2)|sum(f3)|sum(f4)|sum(f5)|sum(f6)|sum(f7)|sum(f8)|
+--------------------+--------------------+-------+-------+-------+-------+-------+-------+-------+-------+
|ne_b9f47d9b-2d1d-...|user_9c4acb8b-1eb...|    140|     56|      0|      0|    164|    104|      0|      0|
|ne_ab326a59-6721-...|user_86d840a9-ad0...|    162|      2|     36|    166|    116|    152|     54|    174|
|ne_00816a9d-e371-...|user_bd5dfa72-0b1...|      0|      0|    162|    330|    432|    201|    450|    268|
|ne_221903ce-6d3f-...|user_86d840a9-ad0...|      0|      0|    102|    195|      0|      0|    166|    146|
|ne_8e0a9af2-a9fb-...|user_bd5dfa72-0b1...|     74|     22|    100|     61|     32|     92|      0|      0|
|ne_dc84d9a8-0269-...|user_9c4acb8b-1eb...|     89|     88|     24|    174|     80|    190|    118|    132|
|ne_00816a9d-e371-...|user_9

# Zero Padding with Map

In [8]:
def get_missing_cols(xdr_index, all_fields):
    result = []
    for index, fields in enumerate(all_fields):
        if index != xdr_index:
            for field in fields:
                result.append(field)
    return result

print(get_missing_cols(1, fields))
        

['f1', 'f2', 'f5', 'f6', 'f7', 'f8']


fill missing col with 0 

In [10]:
from pyspark.sql.functions import lit

new_dfs = []
for index, df in enumerate(dfs):
    missing_cols = get_missing_cols(index, fields)
    new_df = df
    for missing_col in missing_cols:
        new_df = new_df.withColumn(missing_col, lit(0))
    new_df.show(5)
    new_dfs.append(new_df)

+---+---+--------------------+--------------------+---+---+---+---+---+---+
| f1| f2|                  ne|                user| f3| f4| f5| f6| f7| f8|
+---+---+--------------------+--------------------+---+---+---+---+---+---+
| 82| 12|ne_ef2a3cdb-72fa-...|user_b8446a27-51d...|  0|  0|  0|  0|  0|  0|
| 85| 26|ne_af272970-917c-...|user_bd5dfa72-0b1...|  0|  0|  0|  0|  0|  0|
| 93| 29|ne_dc84d9a8-0269-...|user_bd5dfa72-0b1...|  0|  0|  0|  0|  0|  0|
| 11| 74|ne_221903ce-6d3f-...|user_43d99f1a-a04...|  0|  0|  0|  0|  0|  0|
| 69|  3|ne_ef2a3cdb-72fa-...|user_bd5dfa72-0b1...|  0|  0|  0|  0|  0|  0|
+---+---+--------------------+--------------------+---+---+---+---+---+---+
only showing top 5 rows

+---+---+--------------------+--------------------+---+---+---+---+---+---+
| f3| f4|                  ne|                user| f1| f2| f5| f6| f7| f8|
+---+---+--------------------+--------------------+---+---+---+---+---+---+
| 90| 32|ne_af272970-917c-...|user_b8446a27-51d...|  0|  0|  0|

In [12]:
import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 


unioned_df = unionAll(new_dfs)
unioned_df.show(5) 
unioned_df.repartition(1).write.format('com.databricks.spark.csv').save("unioned_df.csv",header = 'true')

+---+---+--------------------+--------------------+---+---+---+---+---+---+
| f1| f2|                  ne|                user| f3| f4| f5| f6| f7| f8|
+---+---+--------------------+--------------------+---+---+---+---+---+---+
| 82| 12|ne_ef2a3cdb-72fa-...|user_b8446a27-51d...|  0|  0|  0|  0|  0|  0|
| 85| 26|ne_af272970-917c-...|user_bd5dfa72-0b1...|  0|  0|  0|  0|  0|  0|
| 93| 29|ne_dc84d9a8-0269-...|user_bd5dfa72-0b1...|  0|  0|  0|  0|  0|  0|
| 11| 74|ne_221903ce-6d3f-...|user_43d99f1a-a04...|  0|  0|  0|  0|  0|  0|
| 69|  3|ne_ef2a3cdb-72fa-...|user_bd5dfa72-0b1...|  0|  0|  0|  0|  0|  0|
+---+---+--------------------+--------------------+---+---+---+---+---+---+
only showing top 5 rows



measure aggregation

In [13]:
unioned_df.groupBy(['ne','user']).sum().show()

+--------------------+--------------------+-------+-------+-------+-------+-------+-------+-------+-------+
|                  ne|                user|sum(f1)|sum(f2)|sum(f3)|sum(f4)|sum(f5)|sum(f6)|sum(f7)|sum(f8)|
+--------------------+--------------------+-------+-------+-------+-------+-------+-------+-------+-------+
|ne_b9f47d9b-2d1d-...|user_9c4acb8b-1eb...|     70|     28|      0|      0|    164|    104|      0|      0|
|ne_ab326a59-6721-...|user_86d840a9-ad0...|     81|      1|     18|     83|     58|     76|     54|    174|
|ne_8e0a9af2-a9fb-...|user_bd5dfa72-0b1...|     37|     11|    100|     61|     16|     46|      0|      0|
|ne_00816a9d-e371-...|user_bd5dfa72-0b1...|      0|      0|     27|     55|    144|     67|    225|    134|
|ne_221903ce-6d3f-...|user_86d840a9-ad0...|      0|      0|     34|     65|      0|      0|    166|    146|
|ne_dc84d9a8-0269-...|user_9c4acb8b-1eb...|     89|     88|     12|     87|     40|     95|     59|     66|
|ne_00816a9d-e371-...|user_9