# Combine CSVs For Processing
Combines the CSVs so all relevant data is on one line and the data is placed based on linking keys through the different CSVs

Everything from this markdown cell to the next one is just a sanity check to make sure the chunking for the CSVs is not giving an odd output. Don't run unless you have some reason to check.

# Combine with Dask

We use dask because it automatically chunks data

We need to determine what features we want to be able to determine which CSVs we need to merge

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.types import LongType

In [2]:
# change to be reflective of your environment
data_dir = '/home/cole/Workspace/School/Capstone/data/first_data_set/TestData/'

In [3]:
# initiates a SparkContext which is necessary for accessing data in Spark
sc = SparkContext()
sqlContext = SQLContext(sc)
# change to match your environment
output_dir = data_dir + "/merge_data"

In [4]:
# based around linking to BACKUP_OBJECTS
CSV_Merge_List = {
    'NODES' : 'NODEID',
    'FILESPACES' : 'FSID',
    'AF_BITFILES' : ['OBJID', 'BFID'],
    'SD_RECON_ORDER' : 'OBJID',
    'SS_POOLS' : 'POOLID',
    'SD_CHUNK_LOCATIONS' : 'POOLID',}

In [5]:
AFBF = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/AF_BITFILES.csv"])
BACKUP_OBJECTS = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/BACKUP_OBJECTS.csv"])
ARCHIVE_OBJECTS = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/ARCHIVE_OBJECTS.csv"])
FILESPACES = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/FILESPACES.csv"])
NODES = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/NODES.csv"])
SD_CHUNK_COPIES = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/SD_CHUNK_COPIES.csv"])
SD_CHUNK_LOCATIONS = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/SD_CHUNK_LOCATIONS.csv"])
SD_CONTAINERS = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/SD_CONTAINERS.csv"])
SD_NON_DEDUP_LOCATIONS = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/SD_NON_DEDUP_LOCATIONS.csv"])
SDRO = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir + '/SD_RECON_ORDER.csv'])
SS_POOLS = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir + '/SS_POOLS.csv'])


In [6]:
AFBF = AFBF.withColumn('POOLID', AFBF.POOLID.cast(IntegerType()))
SD_CONTAINERS = SD_CONTAINERS.withColumn('POOLID', SD_CONTAINERS.POOLID.cast(IntegerType()))
SD_CHUNK_LOCATIONS = SD_CHUNK_LOCATIONS.withColumn('CHUNKID', SD_CHUNK_LOCATIONS.CHUNKID.cast(LongType()))
SD_CHUNK_LOCATIONS = SD_CHUNK_LOCATIONS.withColumn('POOLID', SD_CHUNK_LOCATIONS.POOLID.cast(IntegerType()))

In [7]:
tape = [row['POOLID'] for row in AFBF.select("POOLID").distinct().collect()]
cloud = set([row['POOLID'] for row in SD_CONTAINERS.filter(SD_CONTAINERS.TYPE.rlike('3|4')).select('POOLID').distinct().collect()])
directory = set([row['POOLID'] for row in SD_CONTAINERS.filter(SD_CONTAINERS.TYPE.rlike('1|2')).select('POOLID').distinct().collect()]) - cloud

In [8]:
tape

[-1000000, -1, 6, -9, 72, 4, 82, 135, 42]

In [9]:
cloud

{24, 37, 38, 60, 71, 83, 95, 112, 114}

In [10]:
directory

{27, 138}

In [11]:
merge = BACKUP_OBJECTS.join(AFBF, BACKUP_OBJECTS['OBJID'] == AFBF['BFID'], how='left')
merge = merge.join(SDRO, ['OBJID'], how='left')

In [12]:
merge = merge.withColumn('POOLID', merge.POOLID.cast(IntegerType()))
merge = merge.withColumn('CHUNKID', merge.CHUNKID.cast(LongType()))

In [13]:
# %%timeit
cloud_chunkid = []

for poolid in cloud:
    rows = SD_CHUNK_LOCATIONS.select(SD_CHUNK_LOCATIONS.CHUNKID).filter(F.when(SD_CHUNK_LOCATIONS.POOLID == poolid, True).otherwise(False)).distinct().collect()
    cloud_chunkid.extend([row['CHUNKID'] for row in rows])

In [14]:
# %%timeit
directory_chunkid = []

for poolid in directory:
    rows = SD_CHUNK_LOCATIONS.select(SD_CHUNK_LOCATIONS.CHUNKID).filter(F.when(SD_CHUNK_LOCATIONS.POOLID == poolid, True).otherwise(False)).distinct().collect()
    directory_chunkid.extend([row['CHUNKID'] for row in rows])

In [None]:
from pyspark.sql.functions import udf
def assign_output(poolid, chunkid):
    if poolid:
        return 0
    if chunkid in directory_chunkid:
        return 1
    elif chunkid:
        return 2
    else:
        return None

In [None]:
output = udf(assign_output, IntegerType())

In [None]:
%%time
df_merge = merge.withColumn('OUTPUT', output(merge.POOLID, merge.CHUNKID))

CPU times: user 10.9 s, sys: 80 ms, total: 11 s
Wall time: 11.2 s


In [None]:
df_merge.columns

In [None]:
df_merge.select('OBJID','OUTPUT').show(1000)

In [None]:
df_merge = df_merge.filter(df_merge.OUTPUT. isNotNull())

In [None]:
df_merge.select('OBJID','OUTPUT').show(100)

In [None]:
df_merge.count()

In [None]:
df_merge.select('OUTPUT').distinct().show()

In [None]:
df_merge.select('OUTPUT').count()

In [None]:
directory_chunkid

In [None]:
df = SDRO
df = df.filter(df.CHUNKID. isNotNull())
df = df.withColumn("CHUNKID", df.CHUNKID.cast(LongType()))
df.show(10)

In [None]:
AFBF_POOLID = BACKUP_OBJECTS.join(AFBF, BACKUP_OBJECTS['OBJID'] == AFBF['BFID'], how='left').select('POOLID')

In [None]:
SDRO_POOLID = BACKUP_OBJECTS.join(SDRO, ['OBJID'], how='left').select('POOLID')

In [None]:
SDRO.columns

In [None]:
[row['POOLID'] for row in merge.select("POOLID").distinct().collect()]

In [None]:
df.groupby('POOLID').apply(mapY)

In [None]:
directory = set([row['POOLID'] for row in AFBF.select("POOLID").distinct().collect()])
tape

In [None]:
SD_CONTAINERS.columns

In [None]:
# SD_CONTAINERS.select('POOLID', 'TYPE').show(100)
li2 = SD_CONTAINERS.select("POOLID").rdd.flatMap(lambda x: x).collect()
# set(li)

In [None]:
set(li1).intersection(set(li2))

In [None]:
set(li1).union(set(li2))

In [None]:
merge = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/BACKUP_OBJECTS.csv"])

for key, value in CSV_Merge_List.items():
    print(key, value)
    temp = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"/" + key + ".csv"])
    if len(value) == 2:
            merge = merge.join(temp, merge[value[0]] == temp[value[1]], how='left')
    else:        
        merge = merge.join(temp, [value], how='left')

In [None]:
df = merge
df = df.filter(df.POOLID. isNotNull())
df = df.withColumn("POOLID", df["POOLID"].cast("int"))
df = df.filter(df.ATTRLENGTH. isNotNull())
df = df.withColumn("ATTRLENGTH", df["ATTRLENGTH"].cast("float"))
df = df.filter(df.BFSIZE. isNotNull())
df = df.withColumn("BFSIZE", df["BFSIZE"].cast("float"))
df = df.filter(df.HDRSIZE. isNotNull())
df = df.withColumn("HDRSIZE", df["HDRSIZE"].cast("float"))
df = df.filter(df.OBJID. isNotNull())
df = df.withColumn("OBJID", df["OBJID"].cast("float"))
df.count()

In [None]:
# full_outer_join = BACKUP_OBJECTS
# full_outer_join = full_outer_join.join(NODES, ['NODEID'], how='left')
# full_outer_join = full_outer_join.join(FILESPACES, ['FSID'], how='left')
# full_outer_join = full_outer_join.join(AFBF, full_outer_join.OBJID == AFBF.BFID, how='left')
# full_outer_join = full_outer_join.join(SDRO, ['OBJID'], how='left')
# full_outer_join = full_outer_join.join(SS_POOLS, ['POOLID'], how='left')
# full_outer_join = full_outer_join.join(SD_CHUNK_LOCATIONS, ['POOLID'], how='left')

In [None]:
# df = full_outer_join
# df = df.filter(df.POOLID. isNotNull())
# df = df.withColumn("POOLID", df["POOLID"].cast("int"))
# df = df.filter(df.ATTRLENGTH. isNotNull())
# df = df.withColumn("ATTRLENGTH", df["ATTRLENGTH"].cast("float"))
# df = df.filter(df.BFSIZE. isNotNull())
# df = df.withColumn("BFSIZE", df["BFSIZE"].cast("float"))
# df = df.filter(df.HDRSIZE. isNotNull())
# df = df.withColumn("HDRSIZE", df["HDRSIZE"].cast("float"))
# df = df.filter(df.OBJID. isNotNull())
# df = df.withColumn("OBJID", df["OBJID"].cast("float"))

In [None]:
df.select("OBJID", "ATTRLENGTH", "BFSIZE", "HDRSIZE", "POOLID").write.options(header='true').format('com.databricks.spark.csv').save(data_dir + "/merge_data/4_features")

In [None]:
df.count()

In [None]:
new_df = sqlContext.read.format('com.databricks.spark.csv').option("header", "true").load([data_dir+"//merge_data/4_features/*.csv"])

In [None]:
new_df.count()

In [None]:
sc.stop()