# Prismfp Data Engineering Coding Challenge


# Introduction

The below script is created to make an automated application which will take below inputs<br>
1. A start timestamp and end timestamp <br>
2. A set of Instrument codes<br>
3. A set of value fields <br>
The idea here is to fetch all the values of a field(available in the main data file data.txt) by looking up with the metadata file Static.txt and Dynamic.txt

# Understanding and approach

My understanding from the assignment is as follows:<br>
1. There are 3 files, one is the data file, and other two is the metadata file(where field names are defined)
2. Data file has a dynamic structure, means all the rows may not have the same number of columns
3. The ask here is to provide the inputs as mentioned in the introduction, inputs are start and end timestamp, list of instrument codes and names of the fields, and then get an output in a format which can be useful or used by the downstream system

Approach<br>
1. While coding I always try to take optimal usage of memory so that a less scan is used while doing a lookup.
2. Break the dataset into smaller chunks perform transformation and then union it once all the calculation are done
3. Make sure there are no complex joins or lambda transformation that would impact a processing time

# Section1

This section to define all the required libraries, all the required inputs and initialization of all 3 files into dataframe

In [8]:
import pandas as pd
import re
import datetime
from datetime import datetime

###Input values
Timestamp = input("Please enter the start and end timestamp in comma separated format: ")
Timestamp_list=Timestamp.split(",")
Instrument_Codes=input("Please enter the instrument codes in comma separated format: ")
Instrument_list=Instrument_Codes.split(",")
Fields=input("Please enter all the static and dynamic fields in comma separated format: ")
Input_list_of_fields=Fields.split(",")

###The data file structure is dynamic,but it has maximum 26 fields, the number can also be paramiterized for better coding standard
data = pd.read_csv("data.txt", sep="|", low_memory=False,header=None,names=['col' + str(x) for x in range(27) ])
Static = pd.read_csv("StaticFields.Txt", sep="	", low_memory=False,header=None,names=['col' + str(x) for x in range(2) ])
Dynamic = pd.read_csv("DynamicFields.Txt", sep="	", low_memory=False,header=None,names=['col' + str(x) for x in range(2) ])

Please enter the start and end timestamp in comma separated format: 00:00:32:778,00:30:32:778
Please enter the instrument codes in comma separated format: IXN24AJB64000,IXN23AKB64000
Please enter all the static and dynamic fields in comma separated format: Expiration day,Expiration year (modulo 100)


# Section2

# Data Analysis

In this section all the data analysis results and observations are captured<br>
1. To find the grain of the data
2. If there are duplicates(whether they are valid or not)
3. To find if there are any null values present

In [6]:
#data.head(6)
#data.shape
###Duplicate check
###Merge both static and dynamic files and make it a one copy
DF_Union=pd.concat([Static, Dynamic])
#DF_Union.shape----3271
#DF_Union.head(5)
#DF_Union['col1'].unique.count
#len(pd.unique(DF_Union['col1']))----3228
#DF_Union['col1'].duplicated()
#pd.concat(g for _, g in data.groupby("col3") if len(g) > 1)
len(pd.unique(data['col3']))
#pd.concat(g for _, g in DF_Union.groupby(['col0', 'col1']) if len(g) > 1)

1838

# Section 3

This section is to define any user defined function (if required)

In [3]:
def function_value(string: str,search_text: str):
    match = re.search(search_text, string)
    if match:
        position_start = match.start()
        position_end = match.end()
        List=string[position_end:].split('|')
        
        return List[0]
    

# Section 4

Processing and tranformation<br>
Step1: Filter the main data set based on start timestamp and end timestamp and instrument codes and message type,so that the scan has to be perfomed on the optimal set of records<br>
Step2: Once the processing done both the datafrmae can be merged once again

In [9]:
field_list=[]
for i in Instrument_list:
    for n in Input_list_of_fields:
        for index, row in DF_Union.iterrows():
            #print(i+n)
            #print(row['col1'])
            if (row['col1']==n and row['col0'][0]=='S'):
            #field_list.append(row['col0'])
                field_list.append({'Instrument_Code':i,'Field_Name':n,'field_value':'f'+row['col0'][1:],'message_type':'S'})
            elif (row['col1']==n):
                field_list.append({'Instrument_Code':i,'Field_Name':n,'field_value':'f'+row['col0'][1:],'message_type':'D'})
            
#print(field_list)
rdf = pd.DataFrame(field_list).sort_values('message_type')
#rdf = pd.DataFrame(field_list)
print(rdf)

  Instrument_Code                    Field_Name field_value message_type
0   IXN24AJB64000                Expiration day          f4            S
1   IXN24AJB64000  Expiration year (modulo 100)          f6            S
2   IXN23AKB64000                Expiration day          f4            S
3   IXN23AKB64000  Expiration year (modulo 100)          f6            S


In [10]:
date_format = '%H:%M:%S:%f'

Start_timestamp = datetime.strptime(Timestamp_list[0], date_format)
End_timestamp=datetime.strptime(Timestamp_list[1], date_format)
#print(date_obj)

Required_DF_S=data[(pd.to_datetime(data['col1'],format= '%H:%M:%S:%f' ) >= Start_timestamp) & (pd.to_datetime(data['col1'],format= '%H:%M:%S:%f' )<= End_timestamp) & (data['col3'].isin(Instrument_list)) & (data['col2']=='S') ]
Required_DF_D=data[(pd.to_datetime(data['col1'],format= '%H:%M:%S:%f' ) >= Start_timestamp) & (pd.to_datetime(data['col1'],format= '%H:%M:%S:%f' )<= End_timestamp) & (data['col3'].isin(Instrument_list)) & (data['col2']!='S') ]
#Required_DF=data[(pd.to_datetime(data['col1'],format= '%H:%M:%S:%f' ) >= Start_timestamp)]
#Required_DF=data.apply(lambda row:[j for j in row['col3'] if ( j in Instrument_list & (pd.to_datetime(data['col1'],format= '%H:%M:%S:%f' ) >= Start_timestamp) & (pd.to_datetime(data['col1'],format= '%H:%M:%S:%f' )<= End_timestamp))], axis=1)


In [12]:
for m, n in rdf.iterrows():
    search_text=n['field_value']+'='
    List=[]
    for index, row in Required_DF_S.iterrows():
        
        #print('rdf'+n['Instrument_Code'])
        #print('data' +row['col3'])
        if row['col3']==n['Instrument_Code']:
            main_string = '|'.join(map(str, row))
            List.append(function_value(main_string,search_text))
            
    #print(List)
    a=', '.join(List)
    #print(a)
    #n['New_Field_value']=List
    rdf.at[m, 'New_Field_value'] = a
    


In [18]:
rdf
#rdf.drop('field_value', axis=1, inplace=True)
#rdf = rdf.rename(columns={'New_Field_value': 'Field_Value'})

Unnamed: 0,Instrument_Code,Field_Name,message_type,Field_Value
0,IXN24AJB64000,Expiration day,S,19
1,IXN24AJB64000,Expiration year (modulo 100),S,2024
2,IXN23AKB64000,Expiration day,S,20
3,IXN23AKB64000,Expiration year (modulo 100),S,2023


# Conclusion/Summary

This code is unit tested for below senarios<br>
1. If there are multiple values for one instrument_code and Field_name combination then the field_values will be comma separated(we have make it denormalized for now to avoid making multiple repeted lines.
2. To check if the values are populated correctly in all the output fields

Further enhancements to the existing code:<br>
1. Timestamp can also be added along with the proper message type, here only S and D are shown.
2. Both the dataset can be merged, author only did for Static set in the code
3. SQL part not tried as mainly focussed on python

Challenges faced:
1. As mentioned in the assignment, author tried to complete the initial coding in 3 hours, but eventually it take little more time as to find a better way to get the field values for a corresponding field_name

Summary:
The author wants to thank the PrismFP analytics team for an interesting coding challenge which is definitely enjoyable.

