# German Citys

### Import Packages

In [1]:
import pandas as pd
import sqlalchemy
import os
from dotenv import load_dotenv, find_dotenv
from functools import wraps
import datetime as dt

### Load variables from .env file

In [2]:
# load env data from .env file.
load_dotenv(find_dotenv(filename='.env'))

True

### Logging Wrapper

In [3]:
def log_step(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        tic = dt.datetime.now()
        result = func(*args, **kwargs)
        time_taken = str(dt.datetime.now() - tic)
        print(f"{func.__name__}:\n shape={result.shape} took {time_taken}s\n")
        return result

    return wrapper

### Get Citys from CSV

In [4]:
all_qwm_cities = pd.read_csv("../data/csv/owm_city_data.csv")
world_cities = pd.read_csv("../data/csv/worldcities.csv")
world_cities = pd.read_csv("../data/csv/worldcities.csv")
all_qwm_cities.head(3)

  all_qwm_cities = pd.read_csv("../data/csv/owm_city_data.csv")


Unnamed: 0,id,name,state,country,coord.lon,coord.lat
0,833.0,Ḩeşār-e Sefīd,,IR,47.159401,34.330502
1,2960.0,‘Ayn Ḩalāqīm,,SY,36.321911,34.940079
2,3245.0,Taglag,,IR,44.98333,38.450001


## Data Cleaning Pipeline

### Init Pipeline

In [5]:
@log_step
def init_pipeline(df):
    return  df.copy()

### Rename Columns

In [6]:
@log_step
def rename_columns(df):
    return  (
    df.rename(columns={
        "id": "city_id",
        "name": "city_name",
        "state": "city_state",
        "country": "city_country",
        "coord.lon": "city_longitude",
        "coord.lat": "city_latitude"
    })
    )

### Drop Columns

In [7]:
@log_step
def drop_columns(df):
    return  df.drop(columns=["city_state"])

### Add Columns

In [8]:
@log_step
def add_columns(df):
    city_population = world_cities.assign(municipality_country = lambda x:  x["city_ascii"]+ "," + x["iso2"])[["municipality_country", "population"]]
    city_population = city_population.rename(columns={"population": "city_pop"})
    return  (
        df
        .assign(municipality_country = lambda x: x["city_name"] + "," + x["city_country"])
        .assign(created_at = dt.datetime.now())
        .merge(city_population, how="left")
        .dropna()
        .reset_index(drop=True)
    )

### Drop Duplicates

In [9]:
@log_step
def drop_duplicates(df):
    return  df.drop_duplicates(subset="municipality_country")

### Get German Citys

In [10]:
@log_step
def get_german_cities(df):
    return  (
        df
            .loc[df["city_country"] == "DE"]
            .reset_index(drop=True)
    )  

### Adjust Datatypes

In [11]:

def adjust_datatypes(df):
    df["city_id"] = df["city_id"].astype("int64").astype("string")
    df["city_name"] = df["city_name"].astype("string")
    df["city_country"] = df["city_country"].astype("string")
    df["city_longitude"] = df["city_longitude"].astype("float32")
    df["city_latitude"] = df["city_latitude"].astype("float32")
    df["municipality_country"] = df["municipality_country"].astype("string")
    df["city_pop"] = df["city_pop"].astype("int")
    return (
        df
            .sort_values("city_pop", ascending=False)
            .reset_index(drop=True)
    )

### Send to DB

In [15]:
def send_to_DB(df, table_name, if_exists="replace"):      
    con = f'mysql+pymysql://{os.environ["DB_USER"]}:{os.environ["DB_PASSWORD"]}@{os.environ["DB_HOST"]}:{os.environ["DB_PORT"]}/{os.environ["DB_SCHEMA"]}'
    df.to_sql(
        table_name, 
        con=con, 
        if_exists=if_exists,
        index=False,
        dtype={
            'city_id': sqlalchemy.types.VARCHAR(length=30),
            'city_name': sqlalchemy.types.VARCHAR(length=40),
            'city_country': sqlalchemy.types.VARCHAR(length=40),
            'city_longitude': sqlalchemy.types.Float(precision=3, asdecimal=True),
            'city_latitude': sqlalchemy.types.Float(precision=3, asdecimal=True),
            'municipality_country': sqlalchemy.types.VARCHAR(length=100),
            'created_at': sqlalchemy.types.DateTime(),
            'city_pop': sqlalchemy.types.Integer()
        }
    )
    engine = sqlalchemy.create_engine(con)
    with engine.connect() as engine:
        engine.execute('ALTER TABLE `cities` ADD PRIMARY KEY (`municipality_country`);')
    return df


## RUN Pipeline

In [13]:
german_cities = (
    all_qwm_cities
        .pipe(init_pipeline)
        .pipe(rename_columns)
        .pipe(drop_columns)
        .pipe(add_columns) # Population, Drop Citys with no Population data
        .pipe(drop_duplicates)
        .pipe(get_german_cities)
        .pipe(adjust_datatypes)
        # Send to DB in cell below
)
german_cities.info()

init_pipeline:
 shape=(209579, 6) took 0:00:00.013735s

rename_columns:
 shape=(209579, 6) took 0:00:00.011521s

drop_columns:
 shape=(209579, 5) took 0:00:00.007346s

add_columns:
 shape=(54793, 8) took 0:00:00.449862s

drop_duplicates:
 shape=(26649, 8) took 0:00:00.015666s

get_german_cities:
 shape=(2127, 8) took 0:00:00.003330s

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2127 entries, 0 to 2126
Data columns (total 8 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   city_id               2127 non-null   string        
 1   city_name             2127 non-null   string        
 2   city_country          2127 non-null   string        
 3   city_longitude        2127 non-null   float32       
 4   city_latitude         2127 non-null   float32       
 5   municipality_country  2127 non-null   string        
 6   created_at            2127 non-null   datetime64[ns]
 7   city_pop              2127 non-

In [14]:
send_to_DB(
       df=german_cities, 
       table_name="cities", 
       if_exists="replace", 
)

Unnamed: 0,city_id,city_name,city_country,city_longitude,city_latitude,municipality_country,created_at,city_pop
0,2950158,Berlin,DE,10.45000,54.033329,"Berlin,DE",2022-04-07 10:32:47.888142,3664088
1,2911298,Hamburg,DE,10.00000,53.549999,"Hamburg,DE",2022-04-07 10:32:47.888142,1852478
2,2867714,Munich,DE,11.57549,48.137428,"Munich,DE",2022-04-07 10:32:47.888142,1488202
3,2825297,Stuttgart,DE,9.17702,48.782318,"Stuttgart,DE",2022-04-07 10:32:47.888142,630305
4,2879139,Leipzig,DE,12.37129,51.339619,"Leipzig,DE",2022-04-07 10:32:47.888142,597215
...,...,...,...,...,...,...,...,...
2122,2820101,Unkel,DE,7.21888,50.596531,"Unkel,DE",2022-04-07 10:32:47.888142,5021
2123,2875978,Lonsee,DE,9.91999,48.543400,"Lonsee,DE",2022-04-07 10:32:47.888142,5010
2124,2884667,Krauchenwies,DE,9.25000,48.033329,"Krauchenwies,DE",2022-04-07 10:32:47.888142,5007
2125,2857075,Oerlenbach,DE,10.13333,50.150002,"Oerlenbach,DE",2022-04-07 10:32:47.888142,5004
