In [1]:
import pandas as pd
import re
import json
from collections import Counter
from datetime import datetime
from pandas.io.json import json_normalize

DATA_CLEAN_MESSAGES='clean_messages.csv'
DATA_RAW_MESSAGES='raw_messages.csv'
DATA_WEATHER='weather_data.json'
EXPANDED_COLUMNS_NAME=['status','lat','lat_dir','lon','lon_dir','speed','track_degrees','date','mag_var','mag_var_dir']


In [2]:
df_rm=pd.read_csv(DATA_RAW_MESSAGES).rename(columns={'\tdevice_id':'device_id'})

### Exercise 1: reading the input data 

In [3]:
def unwanted_chars_from_column(df_coulmn=df_rm['raw_message'],regex_allowed_chars="[^0-9a-zA-Z,.]"):
    df_unwanted_chars=df_coulmn.apply(lambda x: (re.findall(regex_allowed_chars, x)))
    unique_list_unwanted_chars=list(set([' '.join(y) for x in df_unwanted_chars for y in x]))
    return unique_list_unwanted_chars
unwanted_chars=unwanted_chars_from_column()
unwanted_chars

['%', '&', '*', '@', '$']

In [4]:
def expand_column_with_delimiter(column=df_rm['raw_message'],
                                 delimiter=',',
                                 expand=True,
                                 new_columns_names=EXPANDED_COLUMNS_NAME):
    col_names = dict(zip(range(0, len(new_columns_names)), new_columns_names))
    return column.str.split(delimiter,expand=expand).rename(columns=col_names)

expand_column_with_delimiter();

In [5]:
def concat_main_df_with_expanded_columns(df1,df2):
    return pd.concat([df1,df2],axis=1)
df_rm=concat_main_df_with_expanded_columns(df_rm,expand_column_with_delimiter());

In [6]:
def replace_unwatned_characters(df=df_rm,columns_names=EXPANDED_COLUMNS_NAME):
    df_rm[columns_names]=df_rm[columns_names].replace(regex=str([x for x in unwanted_chars]), value='')
    return df_rm
df1=replace_unwatned_characters()
df1.head(5)

Unnamed: 0,device_id,datetime,address_ip,address_port,original_message_id,raw_message,status,lat,lat_dir,lon,lon_dir,speed,track_degrees,date,mag_var,mag_var_dir
0,1,1550066999,172.19.0.17,4007,1550070599576-0,"A,5$1.31%8308166&6666@7,*N,4.31572216@6666666,...",A,51.31830816666667,N,4.315722166666666,E,0.0,1.59,150218,0.8,E
1,1,1550067661,172.19.0.16,4007,1550071261429-0,"A,5$1.31%8308166&6666@7,*N,4.31572216@6666666,...",A,51.31830816666667,N,4.315722166666666,E,0.0,1.59,150218,0.8,E
2,1,1550067048,172.19.0.17,4007,1550070648974-0,"A,51.3183085,N,4.315720833333334,E,0.0,5.25,15...",A,51.3183085,N,4.315720833333334,E,0.0,5.25,150218,0.8,E
3,1,1550069034,172.19.0.16,4007,1550072634730-0,"A,51.3183085,N,4.315720833333334,E,0.0,5.25,15...",A,51.3183085,N,4.315720833333334,E,0.0,5.25,150218,0.8,E
4,1,1550069696,172.19.0.16,4007,1550073296534-0,"A,51.3183085,N,4.315720833333334,E,0.0,5.25,15...",A,51.3183085,N,4.315720833333334,E,0.0,5.25,150218,0.8,E


## Exercise 2: calculating metrics 

### 2.1 

In [7]:
## Check the number of rows for each deivce_id
Counter(df1['device_id'])

Counter({'0001': 4110, 'st-1a2090': 23718, '1': 1226})

It seems that the device_id of '1' should be the same as '0001'. For any reason, the zeros before 1 are dropped.
We investigate possibilities. If they have the common IP addresses, our assumption could be correct. However, the more domain knowledge of how this data is collected, could be very helpful


In [8]:
address_ip_0001=df1[df1['device_id']=='0001']['address_ip'].unique() 
address_ip_1=df1[df1['device_id']=='1']['address_ip'].unique()
sorted(address_ip_0001),sorted(address_ip_1)

(['172.18.0.14',
  '172.18.0.15',
  '172.18.0.17',
  '172.19.0.11',
  '172.19.0.15',
  '172.19.0.16',
  '172.19.0.17'],
 ['172.18.0.14',
  '172.18.0.17',
  '172.19.0.11',
  '172.19.0.15',
  '172.19.0.16',
  '172.19.0.17'])

Based on the result, just the ip '172.18.0.15' is not in address_ip_1. It could be defected for any reasons. As a result, we have two ships and we should change device_id=1 to 0001.

In [9]:
df1['device_id']=df1['device_id'].replace('1','0001')
Counter(df1['device_id'])

Counter({'0001': 5336, 'st-1a2090': 23718})

**we have two ships: 0001 & st-1a2090**

### 2.2

In [10]:
#  %H:%M:%S
def create_time_column(df=df1,col_name='hour',time='%Y-%m-%d %H'):
    df[col_name]= df['datetime'].apply(lambda  x: datetime.fromtimestamp(x).strftime(time))
    return df
df=create_time_column()

In [11]:
def create_correct_date_column(df=df1,col_name='correct_date'):
    df[col_name]=df['datetime'].apply(lambda  x: datetime.fromtimestamp(x).strftime('%Y-%m-%d'))
    return df
df=create_correct_date_column()

In [12]:
def min_max_column_over_day(df=df1,ship_id='st-1a2090',column='speed',date='correct_date'):
    df[column]=df[column].astype('float')
    groupby_df=df[df['device_id']==ship_id][['device_id',date,column]].groupby([date])
    maximum=groupby_df.max(column).rename(columns={column:str('max_'+column)}).reset_index()
    minimum=groupby_df.min(column).rename(columns={'speed':str('min_'+column)}).reset_index()
    return maximum.merge(minimum[[date,str('min_'+column)]],on=date)
min_max_column_over_day()

Unnamed: 0,correct_date,max_speed,min_speed
0,2019-02-10,4.97,4.95
1,2019-02-11,1.49,0.0
2,2019-02-12,0.02,0.0
3,2019-02-13,11.5,0.0
4,2019-02-14,9.37,0.0


In [13]:
min_max_column_over_day().to_csv('2.2_max_min_speed.csv')

### 2.3

In [14]:
def average_column_for_ships(df=df1,date='2019-02-13',column='speed'):
    df[column]=df[column].astype('float')
    return df[df['correct_date']==date][['speed','device_id','hour']].groupby(['device_id','hour']).mean(column).reset_index()
average_column_for_ships()

Unnamed: 0,device_id,hour,speed
0,0001,2019-02-13 09,0.011667
1,0001,2019-02-13 10,0.003333
2,0001,2019-02-13 11,0.01163
3,0001,2019-02-13 12,0.010591
4,0001,2019-02-13 13,0.0128
5,0001,2019-02-13 14,0.008848
6,0001,2019-02-13 15,0.01253
7,0001,2019-02-13 16,0.011339
8,st-1a2090,2019-02-13 09,9.655
9,st-1a2090,2019-02-13 10,4.9809


In [15]:
average_column_for_ships().to_csv('2.3_avg_speed.csv')

## Exercise 3: Weather data 

In [16]:
df_w=pd.read_json(DATA_WEATHER)

In [17]:
def flat_table_full_weather_conditions(df=df_w):
    df_flat = pd.DataFrame()
    for idx in range(len(df)):
        temp_df = df[df.index == idx].reset_index()
        temp_df_norm = pd.json_normalize(temp_df['data'][0])
        df_flat = df_flat.append(
            pd.merge(temp_df,
                     temp_df_norm,
                     how='outer',
                     left_index=True,
                     right_index=True).fillna(method='ffill'))
    return df_flat.drop(columns=['index'])

df_flat = flat_table_full_weather_conditions()

In [18]:
def round_time_nearest_hour(time):
    y=datetime.fromtimestamp(time)
    new_hour=y.hour+y.minute//30
    if new_hour==24:
        new_hour=0
    return y.replace(hour=new_hour).replace(minute=0).replace(second=0)

df1['nearest_hour']=df1['datetime'].apply(lambda x:round_time_nearest_hour(x))
df_flat['nearest_hour']=df_flat['ts'].apply(lambda x:round_time_nearest_hour(x))

In [19]:
def round_gps(column,decimals=4):
    return column.astype('float').round(decimals=decimals)

df1['lat']=round_gps(df1['lat'])
df1['lon']=round_gps(df1['lon'])
df_flat['lat']=round_gps(df_flat['lat'])
df_flat['lon']=round_gps(df_flat['lon'])

In [20]:
def clean_join_input_data_with_flat_data(df1=df1, df2=df_flat, how='left', join_list=['lat', 'lon', 'nearest_hour']):
    result = df1.merge(df2, how=how, on=join_list)
    return result[~result.isna().any(axis=1)]

clean_join_input_data_with_flat_data()

Unnamed: 0,device_id,datetime_x,address_ip,address_port,original_message_id,raw_message,status,lat,lat_dir,lon,...,dni,azimuth,datetime_y,temp,precip,clouds,ts,weather.icon,weather.code,weather.description
28,st-1a2090,1550052436,172.23.0.1,4007,1550056036964-0,"A,5$1.90%3500666&6666@66*,N,5.5429625@,E,0.0,5...",A,51.9035,N,5.5430,...,697.54,150.99,2019-02-13:10,3.7,0.0,88.0,1.550052e+09,c04d,804.0,Overcast clouds
29,st-1a2090,1550052437,172.23.0.1,4007,1550056037556-0,"A,51.903500666666666,N,5.5429623333333335,E,0....",A,51.9035,N,5.5430,...,697.54,150.99,2019-02-13:10,3.7,0.0,88.0,1.550052e+09,c04d,804.0,Overcast clouds
532,st-1a2090,1550052376,172.23.0.1,4007,1550055976972-0,"A,5$1.90%3498,N,&5.54@29*67833333333,@E,0.0,25...",A,51.9035,N,5.5430,...,697.54,150.99,2019-02-13:10,3.7,0.0,88.0,1.550052e+09,c04d,804.0,Overcast clouds
533,st-1a2090,1550052383,172.23.0.1,4007,1550055983470-0,"A,51.903498,N,5.542968833333333,E,0.0,249.97,1...",A,51.9035,N,5.5430,...,697.54,150.99,2019-02-13:10,3.7,0.0,88.0,1.550052e+09,c04d,804.0,Overcast clouds
534,st-1a2090,1550052383,172.23.0.1,4007,1550055983976-0,"A,51.903498,N,5.542968833333333,E,0.0,253.24,1...",A,51.9035,N,5.5430,...,697.54,150.99,2019-02-13:10,3.7,0.0,88.0,1.550052e+09,c04d,804.0,Overcast clouds
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29989,st-1a2090,1550067836,172.24.0.1,4007,1550071436892-0,"A,5$1.82%6647833&3333@3,*N,4.95665216@6666666,...",A,51.8266,N,4.9567,...,678.01,212.44,2019-02-13:14,8.3,0.0,100.0,1.550066e+09,c04d,804.0,Overcast clouds
31405,st-1a2090,1550048106,172.20.0.1,4007,1550051706899-2,"A,51.87622233333333,N,5.7971985,E,9.68,282.64,...",A,51.8762,N,5.7972,...,602.91,137.01,2019-02-13:09,4.4,0.0,86.0,1.550048e+09,c04d,804.0,Overcast clouds
31685,st-1a2090,1550063086,172.23.0.1,4007,1550066686863-0,"A,51.817197166666666,N,5.1726453333333335,E,9....",A,51.8172,N,5.1726,...,731.25,197.57,2019-02-13:13,7.2,0.0,100.0,1.550063e+09,c04d,804.0,Overcast clouds
31686,st-1a2090,1550063087,172.23.0.1,4007,1550066687456-0,"A,51.817201,N,5.172609333333333,E,9.82,280.19,...",A,51.8172,N,5.1726,...,731.25,197.57,2019-02-13:13,7.2,0.0,100.0,1.550063e+09,c04d,804.0,Overcast clouds


## Exercise 4: SQL

In [27]:
df1.head(1)

Unnamed: 0,device_id,datetime,address_ip,address_port,original_message_id,raw_message,status,lat,lat_dir,lon,lon_dir,speed,track_degrees,date,mag_var,mag_var_dir,hour,correct_date,nearest_hour
0,1,1550066999,172.19.0.17,4007,1550070599576-0,"A,5$1.31%8308166&6666@7,*N,4.31572216@6666666,...",A,51.3183,N,4.3157,E,0.0,1.59,150218,0.8,E,2019-02-13 15,2019-02-13,2019-02-13 15:00:00


In [87]:
def dic_name_and_type(df,primary_key):
    dic_db={}
    for x in df1:
        dic_db[x]='VARCHAR('+str(df[x].astype('str').str.len().max())+')'
    if primary_key is not None:
        dic_db[primary_key]=dic_db[primary_key]+' PRIMARY KEY'
    return dic_db
dic_table_info=dic_name_and_type(df1,None);

In [92]:
None is not None

False

In [97]:
print(sql_create_table_from_dic(dic_table_info,'ex4'))

CREATE TABLE IF NOT EXISTS ex4(device_id VARCHAR(9),
datetime VARCHAR(10),
address_ip VARCHAR(13),
address_port VARCHAR(4),
original_message_id VARCHAR(15),
raw_message VARCHAR(75),
status VARCHAR(1),
lat VARCHAR(7),
lat_dir VARCHAR(1),
lon VARCHAR(6),
lon_dir VARCHAR(1),
speed VARCHAR(5),
track_degrees VARCHAR(6),
date VARCHAR(6),
mag_var VARCHAR(3),
mag_var_dir VARCHAR(1),
hour VARCHAR(13),
correct_date VARCHAR(10),
nearest_hour VARCHAR(19));


In [94]:
def sql_create_table_from_dic(dic,table_name):
    columns = "(" + ",\n".join(["{} {}".format(k,v) for k,v in dic.items()]) + ")"
    sql_create_table = """CREATE TABLE IF NOT EXISTS """+ str(table_name) +columns+""";"""+""""""
    return sql_create_table


In [104]:
from sqlalchemy import create_engine
engine = create_engine('sqlite://', echo=False)

with engine.begin() as connection:
    df1.to_sql(name='ex4', con=connection, if_exists='append', index=False)

In [102]:
import sqlite3 
  
# connecting to the database  
connection = sqlite3.connect("xomnia.db") 
  
# cursor  
crsr = connection.cursor() 
  
# SQL command to create a table in the database 
# sql_create_table_ex4=sql_create_table_from_dic(dic_table_info,'ex4')

In [103]:
# execute the statement 
df1.to_sql(name='ex4', con=connection, if_exists='append', index=False)
# crsr.execute(sql_create_table_ex4) 
  
# # SQL command to insert the data in the table 
# sql_command = """INSERT INTO ex4 VALUES (23, "Rishabh", "Bansal", "M", "2014-03-28");"""
# crsr.execute(sql_command) 

  
# To save the changes in the files. Never skip this.  
# If we skip this, nothing will be saved in the database. 
# connection.commit() 
  
# close the connection 
# connection.close() 

In [74]:
def get_column_names_from_db_table(sql_cursor, table_name):
    """
    Scrape the column names from a database table to a list
    :param sql_cursor: sqlite cursor
    :param table_name: table name to get the column names from
    :return: a list with table column names
    """

    table_column_names = 'PRAGMA table_info(' + table_name + ');'
    sql_cursor.execute(table_column_names)
    table_column_names = sql_cursor.fetchall()

    column_names = list()

    for name in table_column_names:
        column_names.append(name[1])

    return column_names

get_column_names_from_db_table(crsr,'ex4')

['device_id',
 'datetime',
 'address_ip',
 'address_port',
 'original_message_id',
 'raw_message',
 'status',
 'lat',
 'lat_dir',
 'lon',
 'lon_dir',
 'speed',
 'track_degrees',
 'date',
 'mag_var',
 'mag_var_dir',
 'hour',
 'correct_date',
 'nearest_hour']