In [1]:
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.types import Integer, Date, Time,Text

import os
from time import time
from datetime import timedelta

from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect_sqlalchemy import SqlAlchemyConnector

import pprint
from metabase import Metabase
from metabase_api import Metabase_API

import pipeline_set

In [2]:
pipeline_set.url_C

'https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv?accessType=DOWNLOAD'

In [3]:
getattr(pipeline_set, "url_C")

'https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv?accessType=DOWNLOAD'

In [4]:
def download_data(data_type):
    if data_type == "check":
        return("check", data_type)
    elif data_type == "metabase":
        return("metabase", data_type)
    elif data_type[:1] in ["C","V","P"] and data_type[1:] == " reload":
        data_type = data_type[:1]
        url = getattr(pipeline_set,f"url_{data_type}")
        csv_name = f"MVC_{data_type}.csv"
        os.system(f"wget {url} -O {csv_name}")
        return(csv_name, data_type)
    elif data_type in ["C","V","P"]:
        url = getattr(pipeline_set,f"url_{data_type}")
        csv_name = f"MVC_{data_type}.csv"
        if os.path.isfile(csv_name) is not True:
            os.system(f"wget {url} -O {csv_name}")
            return(csv_name, data_type)
        else:
            return(csv_name, data_type)
    else:
        return("err")

In [31]:
data_type = 'V'
download_data(data_type)

('MVC_P.csv', 'P')

In [5]:
def create_tables(csv_name:str, years:list , data_type:str):
    # Reads the csv
    df = pd.read_csv(csv_name, nrows = 1000, low_memory=False)
    # Selects specified columns
    df = df[getattr(pipeline_set,f"sel_{data_type}")]
    # Rename them 
    df.rename(columns=(getattr(pipeline_set,f"sel_rename_{data_type}")),inplace=True)
    # Convert to datetime type in datetime columns
    df.crash_date = pd.to_datetime(df.crash_date).dt.date
    df.crash_time = pd.to_datetime(df.crash_time,format= '%H:%M' ).dt.time

    # Creates empty tables in a database
    engine = create_engine('postgresql://root:root@localhost:5432/MVC_db') 
    
    for i in years:
        df.head(n=0).to_sql(name = f"MVC_{data_type}_{i}",con = engine, dtype=(getattr(pipeline_set,f"sel_types_{data_type}")),if_exists = 'replace')

    return engine

In [6]:
csv_name = 'MVC_P.csv'
years = [i for i in range(2017,2024)]
data_type = 'P'

create_tables(csv_name, years, data_type)

Engine(postgresql://root:***@localhost:5432/MVC_db)

In [10]:
def transform_and_load(years:list, csv_name:str, engine, data_type:str):
    
    total_rows=0
    total_rows_loaded = 0
    total_time = 0

    # Iterates through chunks of data from the CSV
    df_iter = pd.read_csv(csv_name, iterator = True, chunksize = 100000, low_memory=False)
    df = next(df_iter)
    
    while len(df) > 0:
        try:
            start_time = time()
            
            # Process data
            df = df[getattr(pipeline_set,f"sel_{data_type}")]
            df.rename(columns=(getattr(pipeline_set,f"sel_rename_{data_type}")),inplace=True)
            df.crash_date = pd.to_datetime(df.crash_date).dt.date
            df.crash_time = pd.to_datetime(df.crash_time,format= '%H:%M' ).dt.time
            
            total_rows += len(df)
            
            # Appends them to the tables
            for i in years:
                df_temp = df.loc[pd.DatetimeIndex(df.crash_date).year == i]
                df_temp.to_sql(name = f"MVC_{data_type}_{i}",con = engine, if_exists = 'append')
                
                total_rows_loaded += len(df_temp)
            

            end_time = time()
            total_time += (end_time - start_time)


            print(
            f"total rows processed = {total_rows}", 
            f"total rows loaded = {total_rows_loaded}", 
            'iteration took %.2f seconds' % (end_time - start_time),
            f"total time = %.2f seconds" % (total_time),
            "", sep = "\n")

            df = next(df_iter)

        except StopIteration:
            print("Finished ingesting data into the postgres database")
            break

In [11]:
csv_name = 'MVC_P.csv'
years = [i for i in range(2012,2024)]
data_type = 'P'
engine = create_engine('postgresql://root:root@localhost:5432/MVC_db')

transform_and_load(years, csv_name, engine, data_type)

total rows processed = 100000
total rows loaded = 100000
iteration took 14.25 seconds
total time = 14.25 seconds

total rows processed = 200000
total rows loaded = 200000
iteration took 13.16 seconds
total time = 27.41 seconds

total rows processed = 300000
total rows loaded = 300000
iteration took 12.70 seconds
total time = 40.11 seconds

total rows processed = 400000
total rows loaded = 400000
iteration took 12.64 seconds
total time = 52.75 seconds

total rows processed = 500000
total rows loaded = 500000
iteration took 12.83 seconds
total time = 65.59 seconds

total rows processed = 600000
total rows loaded = 600000
iteration took 13.51 seconds
total time = 79.10 seconds

total rows processed = 700000
total rows loaded = 700000
iteration took 13.34 seconds
total time = 92.44 seconds

total rows processed = 800000
total rows loaded = 800000
iteration took 12.28 seconds
total time = 104.73 seconds

total rows processed = 900000
total rows loaded = 900000
iteration took 13.16 seconds
t

In [12]:
def check_downloaded_data():
    engine = create_engine('postgresql://root:root@localhost:5432/MVC_db')

    res = [['year', 'Crashes','Vehicles','Person']]
    
    # Verifies data presence in the database and prints out summaries.
    for i in range(2012,2024):
        print(f"{i} spreadsheets check")
        temp = []
        temp.append(i)
        try:
            df = pd.read_sql_query('(SELECT COUNT(*) FROM "MVC_C_{}" )'.format(i),con=engine)
            t = int(df.get(key = 'count'))
        except:
            t = 0
        temp.append(t)
        try:
            df = pd.read_sql_query('(SELECT COUNT(*) FROM "MVC_V_{}" )'.format(i),con=engine)
            t = int(df.get(key = 'count'))
        except:
            t = 0
        temp.append(t)
        try:
            df = pd.read_sql_query('(SELECT COUNT(*) FROM "MVC_P_{}" )'.format(i),con=engine)
            t = int(df.get(key = 'count'))
        except:
            t = 0
        temp.append(t)
        res.append(temp)
    C,V,P =0,0,0
    st = '   Downloaded data report:' + '\n'  + '\n'
    for i in range(13):
        for j in range(4):
            if i == 0 or j == 0:
                h = res[i][j]
            else: 
                h = "{:,}".format(res[i][j])
            st += str(h).rjust(11)
        st += '\n'
        if i > 0:
            C += int(res[i][1])
            V += int(res[i][2])
            P += int(res[i][3])
    st = st + '\n' + 'total'.rjust(11) + str("{:,}".format(C)).rjust(11) + str("{:,}".format(V)).rjust(11) + str("{:,}".format(P)).rjust(11)
    print(st)

In [13]:
res = [['year', 'Crashes','Vehicles','Person']]
res[0][1]

'Crashes'

In [14]:
check_downloaded_data()

2012 spreadsheets check


  t = int(df.get(key = 'count'))
  t = int(df.get(key = 'count'))
  t = int(df.get(key = 'count'))


2013 spreadsheets check
2014 spreadsheets check
2015 spreadsheets check
2016 spreadsheets check
2017 spreadsheets check
2018 spreadsheets check
2019 spreadsheets check
2020 spreadsheets check
2021 spreadsheets check
2022 spreadsheets check
2023 spreadsheets check
   Downloaded data report:

       year    Crashes   Vehicles     Person
       2012    100,545    198,968     27,671
       2013    203,740    404,685     55,606
       2014    206,033    409,061     51,853
       2015    217,694    434,582     52,171
       2016    229,831    457,916    800,345
       2017    231,007    464,546    961,775
       2018    231,564    465,817    946,203
       2019    211,486    426,722    854,269
       2020    112,916    231,167    413,197
       2021    110,548    210,942    386,066
       2022    103,875          0    362,532
       2023     72,133          0    254,280

      total  2,031,372  3,704,406  5,165,968


In [71]:
def metabase(years:list):
    # Connection creation
    mblogin = 'root@gmail.com'
    mbpass = 'Aa123456@'
    try:
        mb = Metabase_API('http://localhost:3001/', mblogin, mbpass)
        print("connection ok")
    except:
        print("connection failed")
    
    try:
        colid = mb.get_item_id('collection', "MVC_collection")
        print("collection 'MVC_collection' exists")
        print(f'colid: {colid}')
    except:
        print('no collection')
    
    try:
        dbid = mb.get_item_id('database', "MVC_db")
        print("database ok")
        print(f'dbid: {dbid}')
    except:
        print("MVC_db not found:connect or rename database")        

In [72]:
years = [i for i in range(2012,2024)]
metabase(years)

connection ok
collection 'MVC_collection' exists
colid: 3
database ok
dbid: 2


In [40]:
mblogin = 'root@gmail.com'
mbpass = 'Aa123456@'
mb = Metabase_API('http://localhost:3001/', mblogin, mbpass)
colid = mb.get_item_id('collection', "MVC_collection")
print("collection 'MVC_collection' exists")

collection 'MVC_collection' exists


In [43]:
collection_id = mb.get_item_id('collection', "MVC_collection")

3

In [44]:
import requests

BASE_URL = "http://localhost:3001/"
login_payload = {
    "username": "root@gmail.com",
    "password": "Aa123456@"
}

response = requests.post(BASE_URL + "api/session", json=login_payload)
assert response.status_code == 200, "Failed to log in"

token = response.json()["id"]

In [46]:
headers = {
    "X-Metabase-Session": token
}

response = requests.get(BASE_URL + "api/collection", headers=headers)
collections = response.json()

collection_id = None
for collection in collections:
    if collection['name'] == "MVC_collection":
        collection_id = collection['id']
        break

In [48]:
response = requests.get(BASE_URL + f"api/collection/{collection_id}/items", headers=headers)
cards_in_collection = response.json()

In [57]:
cards_in_collection.keys()

dict_keys(['total', 'data', 'models', 'limit', 'offset'])

In [54]:
type(cards_in_collection)

dict

In [60]:
target_card = None
for card in cards_in_collection['data']:
    if card['model'] == 'card' and card['name'] == 'Activity, Count':
        target_card = card
        break

if target_card:
    with open('output.json', 'w') as f:
        json.dump(target_card, f, indent=4)

In [None]:
test_dash = {
    "description": null,
    "collection_position": null,
    "database_id": null,
    "name": "Activity, Count",
    "moderated_status": null,
    "fully_parametrized": true,
    "id": 1,
    "display": "scalar",
    "entity_id": "tk8rsT7qYHhGI3YWqLfu6",
    "collection_preview": true,
    "last-edit-info": {
        "id": 1,
        "last_name": null,
        "first_name": "root",
        "email": "root@gmail.com",
        "timestamp": "2023-10-08T17:00:04.89381Z"
    },
    "model": "card"
}