# Imports

In [1]:
from os import listdir, path

from buspkg import DBFunctions as db
from buspkg import ProcessData

import pandas as pd
import numpy as np

# Connect to DB

In [2]:
conn = db.GetDefaultConnection()
cur = conn.cursor()

## Retrieving file paths

In [3]:
dataPath = "../data/rawfiles/"
filePaths = [dataPath + f for f in listdir(dataPath) if path.isfile(dataPath + f)]

## Discover encoding of csv files

First step to make sure files are being read in their correct encoding. This is important so no information is lost and no encoding errors occur.

In [4]:
encodings = {}

for file in filePaths:
    encodings[file] = ProcessData.DiscoverFileEncoding(file)

## Visualizing basic information about the data

Load first file to understand column ranges, distribution and types

In [5]:
colTypes = {
    'codigo_viagem':'str', 
    'cnpj':'str',
    'placa':'str',
    'nu_linha':'str',            
    'tipo_viagem':'str',
    'data_viagem_programada':'str',
    'hora_viagem_programada':'str',
    'data_inicio_viagem':'str',
    'data_fim_viagem':'str',
    'sentido_linha':'str',
    'latitude':np.float64,
    'longitude':np.float64,
    'pdop':np.float64,
    'numero_imei':'str',
    'in_transbordo':'str',
    'codigo_viagem_transbordo':'str'
}

filePath = filePaths[0]
encoding = encodings[filePath]
basicStatsDF = pd.read_csv(filePath, encoding=encoding['encoding'], delimiter=';', header=0, dtype=colTypes)

In [6]:
codeSizes = basicStatsDF["codigo_viagem"].str.len()
print(codeSizes.groupby(codeSizes).value_counts())
print()

cnpjSizes = basicStatsDF["cnpj"].str.len()
print(cnpjSizes.groupby(cnpjSizes).value_counts())
print()

lineSizes = basicStatsDF["nu_linha"].str.len()
print(lineSizes.groupby(lineSizes).value_counts())
print()

plateSizes = basicStatsDF["placa"].str.len()
print(plateSizes.groupby(plateSizes).value_counts())
print()

imeiSizes = basicStatsDF["numero_imei"].str.len()
print(imeiSizes.groupby(imeiSizes).value_counts())
print()

typeSizes = basicStatsDF["tipo_viagem"].str.len()
print(typeSizes.groupby(typeSizes).value_counts())

codigo_viagem  codigo_viagem
7              7                100251
Name: count, dtype: int64

cnpj  cnpj
14    14      100251
Name: count, dtype: int64

nu_linha  nu_linha
8         8           100251
Name: count, dtype: int64

placa  placa
7      7        100251
Name: count, dtype: int64

numero_imei  numero_imei
15           15             100251
Name: count, dtype: int64

tipo_viagem  tipo_viagem
1            1              100251
Name: count, dtype: int64


## Processing and writing data to DB

As data is being read, it will be processed and saved to the DB in batches. Since there is a lot of files, there is a chance that some errors can occur during processing and writing. Because of this, a pickle file will be used as checkpoint to store what files were properly written in the database.

In [7]:
tableName = "RegularTrips"
encoding = next(iter(encodings.values()))['encoding']
pickleFilePath = "../data/checkpoints/RawDataProcessingCheckpoint.pickle"

In [8]:
processedFiles = ProcessData.GetPickleCheckpoint(pickleFilePath)
if not processedFiles:
    processedFiles = set([])

for filePath in filePaths:
    if filePath in processedFiles:
        continue

    try:
        batch = ProcessData.ProcessRawTripFile(filePath, encoding)
        db.WriteBatchToDB(batch, tableName, cur, conn)
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(e)
        print(f"Error while processing raw trip data: {filePath}. Rolling back any possible database action.")
    else:
        processedFiles.add(filePath)
        ProcessData.SavePickleCheckpoint(pickleFilePath, processedFiles)

## Creating indices for relevant fields in the database

Although this is a bit off the responsabilities of this notebook, we should ideally create these indices only after inserting the data.

In [9]:
query = "CREATE INDEX IF NOT EXISTS cnpjIdx ON RegularTrips(cnpj)"
db.ExecuteQuery(query, cur, conn)
conn.commit()

In [10]:
query = "CREATE INDEX IF NOT EXISTS plateIdx on RegularTrips(plate)"
db.ExecuteQuery(query, cur, conn)
conn.commit()

In [11]:
conn.close()