Get all files from histo into cassandra cluster

In [1]:

import pandas as pd
from pathlib import Path

# parse all files from histo_velib\paris csv files and aggregate in a dataframe
paris_part1 = Path("../histo_velib/challenge/part_1/paris/stations")
paris_part2 = Path("../histo_velib/challenge/part_2/paris/stations")

    
def df_generator(dir):
    if issubclass(type(dir),Path) and dir.is_dir():
        for file in dir.iterdir():
            # extract the velib station id from filename
            station_id = int(file.name.split("-")[0])
            
            # create a dataframe from csv
            df = pd.read_csv(file)

            # create a series for the station id entry in the dataframe
            df["station_id"] = pd.Series(data=[station_id for row in range(0,len(df.index))], index=df.index)
            yield df

def get_first(generator):
    for i,df in enumerate(generator):
        if (i == 0):
            return df
        
def remove_unchanged(row):
    return row

df = get_first(df_generator(paris_part1))

def convert_to_date(col,name):
    if col.name == name:
        return pd.to_datetime(col)
    else:
        return col

df = df.apply(convert_to_date,args=["moment"])\
        .set_index("moment")\
        .sort_index()\
        .reset_index()

#type(df.index)
print(len(df))

def filterout(row,df):
    #print(row.name) 
    return row

#for index in range(0,len(df)):
    
    
df = df.head().apply(filterout,axis=1,args=[df.head()])
df

12917


Unnamed: 0,moment,bikes,spaces,station_id
0,2016-08-01 10:51:44,7,13,901
1,2016-08-01 11:01:58,7,13,901
2,2016-08-01 11:12:02,7,13,901
3,2016-08-01 11:14:46,6,14,901
4,2016-08-01 11:24:50,6,14,901


In [22]:
# just for the exercise : concat all dataset in a full indexed dataframe
"""
full_df = pd.concat([df.filter(remove_unchanged)\
                       .apply(convert_to_date,args=["moment"])\
                       .set_index(moment) \
                     for df in df_generator(paris_part1)] )\
            .concat([df.filter(remove_unchanged)\
                       .apply(convert_to_date,args=["moment"])\
                       .set_index(moment) \
                     for df in df_generator(paris_part2)] )
full_df.head()
"""

'\nfull_df = pd.concat([df.filter(remove_unchanged)                       .apply(convert_to_date,args=["moment"])                       .set_index(moment)                      for df in df_generator(paris_part1)] )            .concat([df.filter(remove_unchanged)                       .apply(convert_to_date,args=["moment"])                       .set_index(moment)                      for df in df_generator(paris_part2)] )\nfull_df.head()\n'

# insert data into  cassandra db 
keyspace : velib_db_test
family: station

In [3]:
from cassandra.cluster import Cluster

keyspace = "velib_db_histo"
family = "station_dyn"

cluster = Cluster(["84.39.48.220","84.39.45.143","84.39.45.149","84.39.48.102"])
session = cluster.connect(keyspace)


In [3]:


#insert = "INSERT INTO " + family + " (last_update, available_bikes,available_bike_stands,number) VALUES (?,?,?,?)"
insert = "INSERT INTO " + family + " (moment, bikes,spaces,number) VALUES (?,?,?,?)"
stmt = session.prepare(insert)



def df_generator(dir):
    if issubclass(type(dir),Path) and dir.is_dir():
        for file in dir.iterdir():
            # extract the velib station id from filename
            station_id = int(file.name.split("-")[0])
            
            # create a dataframe from csv
            df = pd.read_csv(file)

            # create a series for the station id entry in the dataframe
            df["station_id"] = pd.Series(data=[station_id for row in range(0,len(df.index))], index=df.index)
            yield df

def prepare_station_data(df):
    return df.apply(convert_to_date,args=["moment"])\
                .set_index("moment")\
                .sort_index()\
                .reset_index()
            
            
            
def insert_into_db(row,batch):        
    batch.add(stmt, list(row))
    #session.execute(stmt,list(row))

# todo manage it through a batch for each station
def ingest_dir(dir):
    for csv in df_generator(dir):
        # create a batch
        batch = BatchStatement()
        prepare_station_data(csv).apply(insert_into_db, axis=1,args=[batch])
        yield batch

test_df = get_first(df_generator(paris_part1))
test_df = prepare_station_data(df.head())
test_df

#df.apply(lambda row: session.execute(stmt,list(row)), axis=1)
#df.apply(insert_into_db, axis=1)
#for station in rows:
#    print(" , ".join([str(station.number), str(station.position)]))

Unnamed: 0,moment,bikes,spaces,station_id
0,2016-08-01 10:51:44,7,13,901
1,2016-08-01 11:01:58,7,13,901
2,2016-08-01 11:12:02,7,13,901
3,2016-08-01 11:14:46,6,14,901
4,2016-08-01 11:24:50,6,14,901


In [None]:
from cassandra.query import BatchStatement
from cassandra import ConsistencyLevel


consistency_level=ConsistencyLevel.QUORUM


#test_df.apply(insert_into_db, axis=1,args=[batch])
#TODO increase the execution time_out


# execute all batch/station for the first part
for batch in ingest_dir(paris_part1):
    session.execute(batch)

# execute all batch/station for the second part    
for batch in ingest_dir(paris_part2):
    session.execute(batch)
 
