In [1]:
import pandas as pd
import json
import ast
import configparser
import pymysql

In [2]:
def pipeline(path):
    """
    Function performs required steps to:
    1) clean dataframe
    2) add ingestion timestamp column
    3) prepare dataframe data to insert to MySQL database
    """
    df = pd.read_csv(path)
    df.fillna('', inplace=True)
    df["ingestion_ts"] = pd.to_datetime(pd.Timestamp.today())
    df = df.values.tolist()
    
    return df

In [3]:
path = 'data/ready_for_ingestion/production_companies.csv'
production_companies_to_insert = pipeline(path)

In [7]:
parser = configparser.ConfigParser()
parser.read("config\mysql.conf")
hostname = parser.get("mysql_config", "hostname")
username = parser.get("mysql_config", "username")
password = parser.get("mysql_config", "password")
dbname = parser.get("mysql_config", "database")
port = parser.get("mysql_config", "port")

conn = pymysql.connect(
    host=hostname,
    user=username,
    password=password,
    db=dbname,
    port=int(port)
)

if conn is None:
    print("Error. Connection to MySQL cannot be established.")
else:
    print("Successfully connected to MySQL.")

cur = conn.cursor()

Successfully connected to MySQL.


In [8]:
create_table = """
CREATE TABLE IF NOT EXISTS movies_raw.production_companies (
    production_company_name VARCHAR(255), 
    production_company_id INT,
    movie_id INT,
    ingestion_ts TIMESTAMP
);
"""

insert_query = """
INSERT INTO movies_raw.production_companies (production_company_name, production_company_id, movie_id, ingestion_ts)
VALUES (%s, %s, %s, %s);
"""

In [9]:
cur.execute(create_table)

0

In [10]:
# Wstawienie danych do tabeli
cur.executemany(insert_query, production_companies_to_insert)

# Zatwierdzenie zmian w bazie danych
conn.commit()

# Zamknięcie kursora i połączenia
cur.close()
conn.close()