In [47]:
# Essential libraries
import json
import pandas as pd
import datetime
import uuid

In [13]:
# Loading data from stage 1 (which is just raw data from somewhere)
with open("./data.json", "r") as read_file:
    stages = json.load(read_file)


In [14]:
# Making sense of data
type(data)
stages.keys()

dict_keys(['data'])

In [18]:
# real events are in the data key
events = stages["data"]
events[:10]

[{'stageNum': 1, 'timestamp': '01-01-2018 03:00:15'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:54'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:02'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:11'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:13'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:37'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:08'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:38'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:30'},
 {'stageNum': 1, 'timestamp': '01-01-2018 03:00:46'}]

In [22]:
# What does each event look like? 
type(events[1])
events[1].keys()

dict_keys(['stageNum', 'timestamp'])

In [20]:
events[1].keys()

dict_keys(['stageNum', 'timestamp'])

In [27]:
df_events = pd.DataFrame(events)
df_events[:10]

Unnamed: 0,stageNum,timestamp
0,1,01-01-2018 03:00:15
1,1,01-01-2018 03:00:54
2,1,01-01-2018 03:00:02
3,1,01-01-2018 03:00:11
4,1,01-01-2018 03:00:13
5,1,01-01-2018 03:00:37
6,1,01-01-2018 03:00:08
7,1,01-01-2018 03:00:38
8,1,01-01-2018 03:00:30
9,1,01-01-2018 03:00:46


In [32]:
# Writing the log to a CSV file
df_events.to_csv("./data_stage1.csv", index=False)

In [54]:
# function to add a unique ID to the data
def add_id(data):
    print('Starting with the datafile: ', data)
    df = pd.read_csv(data)
    df = df.assign(id=uuid.uuid4())
    for i, event in df.iterrows():
        df.loc[i, "id"] = uuid.uuid4()
    df.to_csv(data, index=False)
    print('Output datafile: ', data)
    return data

In [55]:
# add a unique ID to the data
add_id("./data_stage1.csv")

Starting with the datafile:  ./data_stage1.csv
Output datafile:  ./data_stage1.csv


'./data_stage1.csv'

In [57]:
# Now we process each event and take them through the stages
# This process has to be repeatable as we'll have to take them through multiple stages
# Output of the stage would be a DF and we can then choose to serialize it in JSON
# "Stage" is "numeric" stage we want the data to go through
# "data" is a csv file

def process(data, stage = 2):
    print('Starting with the datafile: ', data)
    print('Processing stage: ', stage)
    df = pd.read_csv(data)
    out_filename_string = "./data_stage" + str(stage) + ".csv"
    df_out = df
    
    # Go through each record and process
    # For now the process is just changing the stageNum and timestamp
    # TODO: Do fancy things during the data processing stage with the events
    # TODO: Do we need to add some delay
    for i, event in df_out.iterrows():
        df_out.loc[i, "stageNum"] = stage
        now = datetime.datetime.now()
        
        # Using the isoformat for time
        s_now = now.isoformat()
        df_out.loc[i, "timestamp"] = s_now
        
    # Do not write the index or it'll keep adding one to each stage
    df_out.to_csv(out_filename_string, index=False)
    print('Output datafile: ', out_filename_string)
    return out_filename_string

process("./data_stage1.csv")

In [58]:
# Lets build a pipeline now
def pipeline(first_stage, num_stages, data, isSerial = True):
    next_file = data
    next_stage = first_stage
    for stage in range(num_stages):        
        next_file = process(next_file, stage = next_stage)
        next_stage = next_stage + 1
    return next_file

In [59]:
# Try the pipeline
pipeline(3, 2, "./data_stage2.csv")

Starting with the datafile:  ./data_stage2.csv
Processing stage:  3
Output datafile:  ./data_stage3.csv
Starting with the datafile:  ./data_stage3.csv
Processing stage:  4
Output datafile:  ./data_stage4.csv


'./data_stage4.csv'