# Mongo etl

In [20]:
from pymongo import MongoClient
import pandas as pd
import numpy as np
from sqlalchemy import create_engine

pd.set_option('display.max_columns', None)

In [91]:
user = 'agung'
password = 'passwordmongo'
CONNECTION_STRING = f"mongodb+srv://{user}:{password}@cluster1.4jmnd4n.mongodb.net/?retryWrites=true&w=majority"

client = MongoClient(CONNECTION_STRING)

sample_training_mongo = client['sample_training']

zip_collection = sample_training_mongo['zips']
companies_collection = sample_training_mongo['companies']

## Zips

In [137]:
cursor_zips = zip_collection.find({}, {'_id': 0})
zips_df = pd.DataFrame.from_dict(list(cursor_zips))

In [138]:
# flatten loc column to x=long and y=lat
temp = pd.DataFrame(zips_df['loc'].tolist())
zips_df = pd.concat([zips_df, temp], axis=1)
zips_df.drop(['loc'], axis=1, inplace=True)
zips_df.rename(columns={'x': 'long', 'y': 'lat'}, inplace=True)
zips_df.head()

Unnamed: 0,city,zip,pop,state,lat,long
0,ALPINE,35014,3062,AL,33.331165,86.208934
1,BESSEMER,35020,40549,AL,33.409002,86.947547
2,ACMAR,35004,6055,AL,33.584132,86.51557
3,BAILEYTON,35019,1781,AL,34.268298,86.621299
4,HUEYTOWN,35023,39677,AL,33.414625,86.999607


## Companies

In [142]:
# Excludes all nested columns except offices
exc_cols = ['_id','offices','image', 'products', 'relationships', 'competitions', 'providerships', 'funding_rounds', 'investments','acquisition','acquisitions','milestones','video_embeds','screenshots','external_links','partners', 'ipo']

# get only the first office
cursor_comp = companies_collection.aggregate([
    {"$addFields": {
        "office": {"$first": "$offices"}
    }},
    {"$unset" : exc_cols}
], allowDiskUse=True)
companies_df = pd.DataFrame.from_dict(list(cursor_comp))

In [143]:
# fill empty office value with default empty office dict
empty_comp = {
    'description': '',
    'address1': '',
    'address2': '',
    'zip_code': '',
    'city': '',
    'state_code': '',
    'country_code': '',
    'latitude': None,
    'longitude': None
}

companies_df['office'] = np.where(companies_df['office'].notna(), companies_df['office'], empty_comp)

In [144]:
# flatten office column
temp = pd.DataFrame(companies_df['office'].tolist(), )
companies_df = pd.concat([companies_df, temp], axis=1, )
companies_df['office'] = companies_df['office'].apply(lambda x: 1 if x != None else 0)

# rename office data columns
office_col_rename = {
    'description': 'office_description',
    'address1': 'office_address1',
    'address2': 'office_address2',
    'zip_code': 'office_zip_code',
    'city': 'office_city',
    'state_code': 'office_state_code',
    'country_code': 'office_country_code',
    'latitude': 'office_latitude',
    'longitude': 'office_longitude'
}
companies_df.rename(columns=office_col_rename, inplace=True)
companies_df.head()

Unnamed: 0,name,permalink,crunchbase_url,homepage_url,blog_url,blog_feed_url,twitter_username,category_code,number_of_employees,founded_year,founded_month,founded_day,deadpooled_year,tag_list,alias_list,email_address,phone_number,office_description,created_at,updated_at,overview,total_money_raised,office,deadpooled_month,deadpooled_day,deadpooled_url,office_description.1,office_address1,office_address2,office_zip_code,office_city,office_state_code,office_country_code,office_latitude,office_longitude
0,Wetpaint,abc2,http://www.crunchbase.com/company/wetpaint,http://wetpaint-inc.com,http://digitalquarters.net/,http://digitalquarters.net/feed/,BachelrWetpaint,web,47.0,2005.0,10.0,17.0,1.0,"wiki, seattle, elowitz, media-industry, media-...",,info@wetpaint.com,206.859.6300,Technology Platform Company,2007-05-25 06:51:27,Sun Dec 08 07:15:44 UTC 2013,<p>Wetpaint is a technology platform company t...,$39.8M,1,,,,,710 - 2nd Avenue,Suite 1100,98104.0,Seattle,WA,USA,47.603122,-122.333253
1,Facebook,facebook,http://www.crunchbase.com/company/facebook,http://facebook.com,http://blog.facebook.com,http://blog.facebook.com/atom.php,facebook,social,5299.0,2004.0,2.0,1.0,,"facebook, college, students, profiles, network...",,,,Social network,Fri May 25 21:22:15 UTC 2007,Thu Nov 21 19:40:55 UTC 2013,<p>Facebook is the world&#8217;s largest socia...,$2.43B,1,,,,Headquarters,1601 Willow Road,,94025.0,Menlo Park,CA,USA,37.41605,-122.151801
2,Omnidrive,omnidrive,http://www.crunchbase.com/company/omnidrive,http://www.omnidrive.com,http://www.omnidrive.com/blog,http://feeds.feedburner.com/omnidrive,Nomadesk,network_hosting,,2005.0,11.0,1.0,2008.0,"storage, sharing, edit, online",,info@omnidrive.com,660-675-5052,,Sun May 27 03:25:32 UTC 2007,Tue Jul 02 22:48:04 UTC 2013,"<p>Currently in public beta, Omnidrive makes i...",$800k,1,9.0,15.0,,,Suite 200,654 High Street,94301.0,Palo Alto,CA,ISR,,
3,Twitter,twitter,http://www.crunchbase.com/company/twitter,http://twitter.com,http://twitter.com/blog,http://feeds.feedburner.com/TwitterBlog,twitter,social,1300.0,2006.0,3.0,21.0,,"text, messaging, social, community, twitter, t...",,press@twitter.com,,Real time communication platform,Fri Jun 01 08:42:34 UTC 2007,Sat Dec 07 16:07:56 UTC 2013,"<p>Created in 2006, Twitter is a global real-t...",$1.16B,1,,,,,1355 Market St.,,94103.0,San Francisco,CA,USA,37.776805,-122.416924
4,StumbleUpon,stumbleupon,http://www.crunchbase.com/company/stumbleupon,http://www.stumbleupon.com,http://www.stumbleupon.com/blog,,stumbleupon,web,,2002.0,2.0,1.0,,"content-discovery, find, content, stumble, too...",,,,Content discovery service,2007-06-01 09:28:13,Wed Mar 20 04:14:12 UTC 2013,<p>StumbleUpon is the easiest way to discover ...,$18.5M,1,,,,,,,,San Francisco,CA,USA,37.775196,-122.419204


In [145]:
companies_df[companies_df.office == 0]

Unnamed: 0,name,permalink,crunchbase_url,homepage_url,blog_url,blog_feed_url,twitter_username,category_code,number_of_employees,founded_year,founded_month,founded_day,deadpooled_year,tag_list,alias_list,email_address,phone_number,office_description,created_at,updated_at,overview,total_money_raised,office,deadpooled_month,deadpooled_day,deadpooled_url,office_description.1,office_address1,office_address2,office_zip_code,office_city,office_state_code,office_country_code,office_latitude,office_longitude


# Load to Postgres

In [146]:
#connection
password = 'Sukses37'
url = f'postgresql+psycopg2://postgres:{password}@localhost:5432/postgres'
engine = create_engine(url)

try:
    companies_df.to_sql('companies', index=False, con=engine, if_exists='replace')
    zips_df.to_sql('zips', index=False, con=engine, if_exists='replace')
    print(f"Mongo Data has been load to Postgres Database Successfully")
except Exception as e:
    print(f"Mongo ETL is Failed")
    print(f"[ERROR]: {e}")

Mongo Data has been load to Postgres Database Successfully


---

### backup csv to postgres

In [2]:
from sqlalchemy import create_engine
import pandas as pd


In [46]:
application_test = pd.read_csv('../resources/application_test.csv')
application_train = pd.read_csv('../resources/application_train.csv')

In [47]:
#connection
password = 'Sukses37'
url = f'postgresql+psycopg2://postgres:{password}@localhost:5432/postgres'
engine = create_engine(url)

try:
    application_test.to_sql('home_credit_default_risk_application_test', index=False, con=engine, if_exists='replace')
    application_train.to_sql('home_credit_default_risk_application_train', index=False, con=engine, if_exists='replace')
    print(f"Data has been load to Postgres Database Successfully")
except Exception as e:
    print(f"Mongo ETL is Failed")
    print(f"[ERROR]: {e}")

Data has been load to Postgres Database Successfully


---

# MySQL ETL

In [5]:
from sqlalchemy import create_engine
import findspark
findspark.init()
from pyspark.sql import SparkSession
# from pyspark.sql.functions import *
# from pyspark.sql.types import *

In [6]:
# initiate  spark
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("ds9_final_project") \
    .getOrCreate()
    # .config("spark.jars", "C:/Program Files (x86)/MySQL/Connector J 8.0/mysql-connector-java-8.0.28.jar") \
    # .config("spark.driver.extraClassPath", "C:/Program Files (x86)/MySQL/Connector J 8.0/mysql-connector-java-8.0.28.jar") \

In [8]:
application_test = spark.read. \
    format("csv"). \
    option("inferSchema","true"). \
    option("header","true"). \
    load("../resources/application_test.csv")
    
application_train = spark.read. \
    format("csv"). \
    option("inferSchema","true"). \
    option("header","true"). \
    load("../resources/application_train.csv")

In [10]:
application_test.show(3)

+----------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+----------------+--------------------+------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+------------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+-----------------+------------------+------------------+------------------+--------------+----------------+---------------------------+---------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+-----------------------+-------

In [9]:
application_train.show(3)

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+----------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+-------------------+------------------+-------------------+--------------+----------------+---------------------------+------------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+-----------------

In [46]:
password = 'Sukses37'
application_train.write.format('jdbc').options(
      url='jdbc:mysql://localhost:3306/digital_skola',
      driver='com.mysql.jdbc.Driver',
      dbtable='home_credit_default_risk_application_train',
      user='root',
      password=password).mode('overwrite').save()

application_test.write.format('jdbc').options(
      url='jdbc:mysql://localhost:3306/digital_skola',
      driver='com.mysql.jdbc.Driver',
      dbtable='home_credit_default_risk_application_test',
      user='root',
      password=password).mode('overwrite').save()

Py4JJavaError: An error occurred while calling o219.save.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:101)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:101)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:101)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:218)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:222)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:46)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


In [19]:
x = "An error occurred while calling o89.save.\n: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver\r\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\r\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:418)\r\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:351)\r\n\tat org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)\r\n\tat org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:101)\r\n\tat org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:101)\r\n\tat scala.Option.foreach(Option.scala:407)\r\n\tat org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:101)\r\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:218)\r\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:222)\r\n\tat org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:46)\r\n\tat org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)\r\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)\r\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)\r\n\tat org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)\r\n\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)\r\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)\r\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)\r\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)\r\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)\r\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)\r\n\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)\r\n\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)\r\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)\r\n\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)\r\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)\r\n\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)\r\n\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)\r\n\tat org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)\r\n\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)\r\n\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)\r\n\tat org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)\r\n\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)\r\n\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)\r\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)\r\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\r\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\r\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\r\n\tat java.lang.reflect.Method.invoke(Method.java:498)\r\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\r\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\r\n\tat py4j.Gateway.invoke(Gateway.java:282)\r\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\r\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\r\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\r\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\r\n\tat java.lang.Thread.run(Thread.java:748)\r\n"
print(x)

An error occurred while calling o89.save.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:101)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:101)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:101)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:218)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:222)
	at org.apache.spark.sql.execution.datasources.jdbc.

# Mysql to Postgres

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [30]:
# initiate  spark
spark = SparkSession \
    .builder \
    .config("spark.jars", "../drivers/mysql-connector-java-8.0.28.jar") \
    .config("spark.driver.extraClassPath", "../drivers/mysql-connector-java-8.0.28.jar/mysql-connector-java-8.0.28.jar") \
    .master("local") \
    .appName("ds9_final_project") \
    .getOrCreate()

In [31]:
# Read from MySql
application_train = spark.read.format('jdbc').options(
      url='jdbc:mysql://localhost:3306/digitalSkola',
      driver='com.mysql.jdbc.Driver',
      dbtable='home_credit_default_risk_application_train',
      user='root',
      password='anypassword').read()

application_test = spark.read.format('jdbc').options(
      url='jdbc:mysql://localhost:3306/digitalSkola',
      driver='com.mysql.jdbc.Driver',
      dbtable='home_credit_default_risk_application_test',
      user='root',
      password='anypassword').read()

AttributeError: 'DataFrameReader' object has no attribute 'read'

In [None]:
# Load data to Postgresql
application_train.write.format('jdbc').options(
      url='jdbc:postgresql://localhost:3306/digitalSkola',
      driver='org.postgresql.Driver',
      dbtable='home_credit_default_risk_application_train',
      user='root',
      password='anypassword').mode('overwrite').save()

application_test.write.format('jdbc').options(
      url='jdbc:postgresql://localhost:3306/digitalSkola',
      driver='org.postgresql.Driver',
      dbtable='home_credit_default_risk_application_test',
      user='root',
      password='anypassword').mode('overwrite').save()

---

# Streaming Currency API to Postgres

In [154]:
import requests
import pandas as pd
from datetime import datetime, timedelta, date

In [33]:
import base64
import json
def get_stream(url):
    s = requests.Session()

    with s.get(url, headers=None, stream=True) as resp:
        for line in resp.iter_lines(decode_unicode=True):
            if line:
                print(line)
                print(type(line))
                line = json.loads(line)
                print(line)


url = 'https://www.freeforexapi.com/api/live?pairs=EURUSD,EURGBP,USDEUR'
get_stream(url)

{"rates":{"EURUSD":{"rate":1.054096,"timestamp":1670227923},"EURGBP":{"rate":0.857992,"timestamp":1670227923},"USDEUR":{"rate":0.94868,"timestamp":1670227923}},"code":200}
<class 'str'>
{'rates': {'EURUSD': {'rate': 1.054096, 'timestamp': 1670227923}, 'EURGBP': {'rate': 0.857992, 'timestamp': 1670227923}, 'USDEUR': {'rate': 0.94868, 'timestamp': 1670227923}}, 'code': 200}


script to subscribe topic and send data to postgresql (add details currency)  
EURUSD=US Dollar, EURGBP=Pound Sterling, USDEUR=Euro  
Format = {‘currency_id’, ‘rate’, ‘timestamp’, ‘currency_name’}

In [9]:
response = {
    "rates": {
        "EURUSD": {
            "rate": 1.058257, 
            "timestamp": 1670214722
        }, 
        "EURGBP": {"rate": 0.857315, "timestamp": 1670214722}, 
        "USDEUR": {"rate": 0.94495, "timestamp": 1670214722}
    }, 
    "code": 200
}
res_data = response['rates']

currencies = {'EURUSD': 'US Dollar', 'EURGBP': 'Pound Sterling', 'USDEUR': 'Euro'}
for curr, curr_name in currencies.items():

    data = {}
    data['currency_id'] = curr
    data['rate'] = res_data[curr]['rate']
    data['timestamp'] = datetime.fromtimestamp( res_data[curr]['timestamp'] )  
    data['currency_name'] = curr_name
    


{'currency_id': 'EURUSD', 'rate': 1.058257, 'timestamp': datetime.datetime(2022, 12, 5, 11, 32, 2), 'currency_name': 'US Dollar'}
{'currency_id': 'EURGBP', 'rate': 0.857315, 'timestamp': datetime.datetime(2022, 12, 5, 11, 32, 2), 'currency_name': 'Pound Sterling'}
{'currency_id': 'USDEUR', 'rate': 0.94495, 'timestamp': datetime.datetime(2022, 12, 5, 11, 32, 2), 'currency_name': 'Euro'}


In [26]:
from random import SystemRandom
(3 * SystemRandom().random())

0.727139881173635

In [2]:
def insert_data(data: dict, tablename: str, col_names: list[str]):

    print(f"INSERT INTO {tablename} VALUES (:page_type, :page_url, :user_id, :session_id, :event_timestamp)")
    print(data)
    print(col_names)

data = {
    'name': 'agung',
    'age': 28,
    'now': datetime.now()
}

col_names = ['name', 'age', 'now']
insert_data(data, 'tbl_user', col_names)

INSERT INTO tbl_user VALUES (:page_type, :page_url, :user_id, :session_id, :event_timestamp)
{'name': 'agung', 'age': 28, 'now': datetime.datetime(2022, 12, 5, 11, 15, 26, 285290)}
['name', 'age', 'now']


---

# DataWarehouse Processing (Spark)

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine

Create dim country (id, country_code)   
Create dim state (id, country_id, state_code)   
Create dim city (id, state_id, city_name, zip_code)  
Create dim currency (id, currency_name, currency_code)  

In [2]:
engine = create_engine("postgresql://postgres:Sukses37@localhost:5432/postgres")

In [3]:
zips = pd.read_sql(f"select * from zips", con=engine)
companies = pd.read_sql(f"select * from companies", con=engine)
currencies = pd.read_sql(f"select * from currencies", con=engine)

In [122]:
zips.columns

Index(['city', 'zip', 'pop', 'state', 'lat', 'long'], dtype='object')

In [123]:
companies.columns

Index(['name', 'permalink', 'crunchbase_url', 'homepage_url', 'blog_url',
       'blog_feed_url', 'twitter_username', 'category_code',
       'number_of_employees', 'founded_year', 'founded_month', 'founded_day',
       'deadpooled_year', 'tag_list', 'alias_list', 'email_address',
       'phone_number', 'office_description', 'created_at', 'updated_at',
       'overview', 'total_money_raised', 'deadpooled_month', 'deadpooled_day',
       'deadpooled_url', 'office_address1', 'office_address2',
       'office_zip_code', 'office_city', 'office_state_code',
       'office_country_code', 'office_latitude', 'office_longitude'],
      dtype='object')

## Dim Currencies

In [4]:
currencies.head(2)

Unnamed: 0,id,currency_code,currency_name,rate,timestamp
0,2.0,EURUSD,US Dollar,1.052776,1670236622
1,3.0,EURGBP,Pound Sterling,0.859086,1670236622


In [11]:
cols = ['currency_code', 'currency_name']
dim_currencies = currencies[cols].groupby(cols).count().reset_index().reset_index()
dim_currencies.rename(columns={'index': 'currency_id'}, inplace=True)
dim_currencies['currency_id'] = dim_currencies['currency_id'] + 1
dim_currencies

Unnamed: 0,currency_id,currency_code,currency_name
0,1,EURGBP,Pound Sterling
1,2,EURUSD,US Dollar
2,3,USDEUR,Euro


In [12]:
try:
    res = dim_currencies.to_sql('dim_currencies', con=engine, schema='dwh', if_exists='replace', index=False)
    print(f'success insert data to table: dim_currencies, inserted {res} data')
except Exception as e:
    print('Failed to insert data to table: dim_currencies')
    print(f'ERROR: {e}')

success insert data to table: dim_currencies, inserted 3 data


## Dim Country

In [13]:
companies.columns
cols = ['office_country_code']
dim_country = companies[cols].groupby(cols).count().reset_index().reset_index()
dim_country = dim_country.rename(columns={"index":"country_id"})
dim_country = dim_country[dim_country['office_country_code'] != '']
dim_country.head(2)

Unnamed: 0,country_id,office_country_code
1,1,AFG
2,2,ALB


In [17]:
try:
    res = dim_country.to_sql('dim_country', con=engine, schema='dwh', index=False, if_exists='replace')
    print(f'success insert data to table: dim_country, inserted {res} data')
except Exception as e:
    print('Failed to insert data to table: dim_country')
    print(f'ERROR: {e}')

success insert data to table: dim_country, inserted 92 data


## Dim State

In [22]:
zips.head(3)

Unnamed: 0,city,zip,pop,state,lat,long
0,ALPINE,35014,3062,AL,33.331165,86.208934
1,BESSEMER,35020,40549,AL,33.409002,86.947547
2,ACMAR,35004,6055,AL,33.584132,86.51557


In [21]:
companies.head(3)

Unnamed: 0,name,permalink,crunchbase_url,homepage_url,blog_url,blog_feed_url,twitter_username,category_code,number_of_employees,founded_year,founded_month,founded_day,deadpooled_year,tag_list,alias_list,email_address,phone_number,office_description,created_at,updated_at,overview,total_money_raised,office,deadpooled_month,deadpooled_day,deadpooled_url,office_address1,office_address2,office_zip_code,office_city,office_state_code,office_country_code,office_latitude,office_longitude
0,Wetpaint,abc2,http://www.crunchbase.com/company/wetpaint,http://wetpaint-inc.com,http://digitalquarters.net/,http://digitalquarters.net/feed/,BachelrWetpaint,web,47.0,2005.0,10.0,17.0,1.0,"wiki, seattle, elowitz, media-industry, media-...",,info@wetpaint.com,206.859.6300,,2007-05-25 06:51:27,Sun Dec 08 07:15:44 UTC 2013,<p>Wetpaint is a technology platform company t...,$39.8M,1,,,,710 - 2nd Avenue,Suite 1100,98104,Seattle,WA,USA,47.603122,-122.333253
1,Facebook,facebook,http://www.crunchbase.com/company/facebook,http://facebook.com,http://blog.facebook.com,http://blog.facebook.com/atom.php,facebook,social,5299.0,2004.0,2.0,1.0,,"facebook, college, students, profiles, network...",,,,Headquarters,Fri May 25 21:22:15 UTC 2007,Thu Nov 21 19:40:55 UTC 2013,<p>Facebook is the world&#8217;s largest socia...,$2.43B,1,,,,1601 Willow Road,,94025,Menlo Park,CA,USA,37.41605,-122.151801
2,Omnidrive,omnidrive,http://www.crunchbase.com/company/omnidrive,http://www.omnidrive.com,http://www.omnidrive.com/blog,http://feeds.feedburner.com/omnidrive,Nomadesk,network_hosting,,2005.0,11.0,1.0,2008.0,"storage, sharing, edit, online",,info@omnidrive.com,660-675-5052,,Sun May 27 03:25:32 UTC 2007,Tue Jul 02 22:48:04 UTC 2013,"<p>Currently in public beta, Omnidrive makes i...",$800k,1,9.0,15.0,,Suite 200,654 High Street,94301,Palo Alto,CA,ISR,,


In [None]:
# set id of country
cols = ['office_country_code']
dim_country = zips[cols].groupby(cols).count().reset_index().reset_index()
dim_country = dim_country.rename(columns={"index":"state_id"})
dim_country['state_id'] = dim_country['state_id'] + 1 

In [24]:
cols = ['state']
dim_state = zips[cols].groupby(cols).count().reset_index().reset_index()
dim_state = dim_state.rename(columns={"index":"state_id"})
dim_state['state_id'] = dim_state['state_id'] + 1 
# dim_state = dim_state[dim_state['office_country_code'] != '']
# dim_state = dim_state.merge(dim_country, on='office_country_code').drop(columns='office_country_code')
# dim_state = dim_state[dim_state['office_state_code'] != '']
dim_state.head(2)

Unnamed: 0,state_id,state
0,1,AK
1,2,AL


In [132]:
try:
    res = dim_state.to_sql('dim_state', con=engine, schema='dwh', index=False, if_exists='replace')
    print(f'success insert data to table: dim_state, inserted {res} data')
    res = dim_state.to_sql('dim_state', con=engine, schema='dwh', index=False, if_exists='replace')
    print(f'success insert data to table: dim_state, inserted {res} data')
except Exception as e:
    print('Failed to insert data to table: dim_state')
    print(f'ERROR: {e}')

success insert data to table: dim_state, inserted 68 data


## Dim City

In [133]:
zips.columns
cols = ['state','city', 'zip']
dim_city = zips[cols].groupby(cols).count().reset_index()
dim_city = dim_city.merge(dim_state, left_on='state', right_on='office_state_code').drop(columns=['office_state_code','state_id','country_id'])
dim_city = dim_city.groupby(['state', 'city', 'zip']).count().reset_index().reset_index()
dim_city = dim_city[(dim_city.city != '') | (dim_city.zip != '')]
dim_city = dim_city.rename(columns={"index":"city_id"})
dim_city['city_id'] = dim_city['city_id'] + 1
dim_city.head(2)

Unnamed: 0,city_id,state,city,zip
0,1,AL,ABBEVILLE,36310
1,2,AL,ACMAR,35004


In [134]:
try:
    res = dim_city.to_sql('dim_city', con=engine, schema='dwh', index=False, if_exists='replace')
    print(f'success insert data to table: dim_city, inserted {res} data')
except Exception as e:
    print('Failed to insert data to table: dim_city')
    print(f'ERROR: {e}')

success insert data to table: dim_city, inserted 351 data


# Fact Tables

In [3]:
import pandas as pd
from datetime import datetime, timedelta, date
from sqlalchemy import create_engine

### Create fact total city & office per state

In [135]:
companies.columns

Index(['name', 'permalink', 'crunchbase_url', 'homepage_url', 'blog_url',
       'blog_feed_url', 'twitter_username', 'category_code',
       'number_of_employees', 'founded_year', 'founded_month', 'founded_day',
       'deadpooled_year', 'tag_list', 'alias_list', 'email_address',
       'phone_number', 'office_description', 'created_at', 'updated_at',
       'overview', 'total_money_raised', 'deadpooled_month', 'deadpooled_day',
       'deadpooled_url', 'office_address1', 'office_address2',
       'office_zip_code', 'office_city', 'office_state_code',
       'office_country_code', 'office_latitude', 'office_longitude'],
      dtype='object')

In [151]:
fct_agg_state = companies.groupby('office_state_code').agg({'office_city': 'count', 'office': 'count'}).reset_index()
fct_agg_state = fct_agg_state[fct_agg_state['office_state_code'] != '']
fct_agg_state.head()

Unnamed: 0,office_state_code,office_city,office
1,AL,13,13
2,AR,5,5
3,AZ,53,53
4,CA,1723,1726
5,CO,91,91


In [152]:
try:
    res = fct_agg_state.to_sql('fct_city_office_per_state', con=engine, schema='dwh', index=False, if_exists='replace')
    print(f'success insert data to table: fct_agg_state, inserted {res} data')
except Exception as e:
    print('Failed to insert data to table: fct_agg_state')
    print(f'ERROR: {e}')

success insert data to table: fct_agg_state, inserted 47 data


### Create fact currency daily avg

In [41]:
currencies = pd.read_sql(f"select * from currencies", con=engine)
currencies.head(3)

Unnamed: 0,id,currency_code,currency_name,rate,timestamp
0,2.0,EURUSD,US Dollar,1.052776,1670236622
1,3.0,EURGBP,Pound Sterling,0.859086,1670236622
2,4.0,USDEUR,Euro,0.94987,1670236622


In [13]:
# change datetime to timestamp
date_ = date.today()
# date_ = date(2022,12,6)
today = datetime(date_.year, date_.month, date_.day)
yesterday = today - timedelta(days=1)
today = datetime.timestamp(today)
yesterday = datetime.timestamp(yesterday)
print(yesterday, today)

1670173200.0 1670259600.0


In [26]:
fct_currency_daily = currencies[(currencies.timestamp >= yesterday) & (currencies.timestamp < today)]
fct_currency_daily = fct_currency_daily.groupby('currency_code').agg({'rate': 'mean'}).reset_index()
fct_currency_daily.rename(columns={'rate':'daily_avg_rate'}, inplace=True)
fct_currency_daily.head()

Unnamed: 0,currency_code,daily_avg_rate
0,EURGBP,0.860072
1,EURUSD,1.057013
2,USDEUR,0.946066


In [27]:
# Load to dwh
try:
    res = fct_currency_daily.to_sql('fct_currency_daily', con=engine, schema='dwh', index=False, if_exists='replace')
    print(f'success insert data to table: fct_currency_daily, inserted {res} data')
except Exception as e:
    print('Failed to insert data to table: fct_currency_daily')
    print(f'ERROR: {e}')

success insert data to table: fct_currency_daily, inserted 3 data


### Fact table for currencies monthly

In [32]:
# change datetime to timestamp
date_ = date.today() - timedelta(days=date.today().day)
# date_ = date(2022,12,6)
today = datetime(date_.year, date_.month, date_.day)
yesterday = today.replace(day=1)
today = datetime.timestamp(today)
yesterday = datetime.timestamp(yesterday)
print(yesterday, today)

1667235600.0 1669741200.0


In [42]:
fct_currency_monthly = currencies[(currencies.timestamp >= yesterday) & (currencies.timestamp < today)]
fct_currency_monthly = fct_currency_monthly.groupby('currency_code').agg({'rate': 'mean'}).reset_index()
fct_currency_monthly.rename(columns={'rate':'monthly_avg_rate'}, inplace=True)
fct_currency_monthly.head()

Unnamed: 0,currency_code,monthly_avg_rate
0,EURGBP,0.859517
1,EURUSD,1.053029
2,USDEUR,0.949641


In [43]:
try:
    res = fct_currency_monthly.to_sql('fct_currency_monthly', con=engine, schema='dwh', index=False, if_exists='replace')
    print(f'success insert data to table: fct_currency_monthly, inserted {res} data')
except Exception as e:
    print('Failed to insert data to table: fct_currency_monthly')
    print(f'ERROR: {e}')

success insert data to table: fct_currency_monthly, inserted 3 data


# Preprocessing Data

In [71]:
import pandas as pd
from datetime import datetime, timedelta, date
from sqlalchemy import create_engine

from sklearn.model_selection import train_test_split
from imblearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import classification_report
import logging

In [111]:
application_train = pd.read_sql("select * from home_credit_default_risk_application_train", con=engine)
application_test = pd.read_sql("select * from home_credit_default_risk_application_test", con=engine)

Check train data label distribution  
Check the null column. If > 60% null, drop column.  
one hot encoding categorical data  
impute null values with median  
Store data to table:  
home_credit_default_risk_application_test_clean  
home_credit_default_risk_application_train_clean  

In [112]:
# drop column that has > 60% null values
def drop_isna_cols(df):
    isna_cols = df.isna().sum().reset_index()
    isna_cols.rename(columns={0:'isna_sum'}, inplace=True)
    isna_cols = isna_cols[isna_cols.isna_sum > (0.6 * len(df))]['index'].tolist()
    df.drop(columns=isna_cols, inplace=True)
    return df

application_train = drop_isna_cols(application_train)
application_test = drop_isna_cols(application_test)

In [113]:
# one hot encoding categorical columns
application_train = pd.get_dummies(application_train)
application_test = pd.get_dummies(application_test)

In [114]:
# input null values with median
application_train = application_train.fillna(application_train.median())
application_test = application_test.fillna(application_test.median())

In [85]:
# load clean data to database
# try:
#     res_train = application_train.to_sql('home_credit_default_risk_application_train_clean', con=engine, if_exists='replace')
#     res_test = application_test.to_sql('home_credit_default_risk_application_test_clean', con=engine, if_exists='replace')
#     logging.info(f'success insert data to table: home_credit_default_risk_application_train_clean, inserted {res_train} data')
#     logging.info(f'success insert data to table: home_credit_default_risk_application_test_clean, inserted {res_test} data')
# except Exception as e:
#     logging.info('Failed to insert data to table: fct_currency_daily')
#     logging.error(e)

In [116]:
train_labels = application_train['TARGET']

# Align the training and testing data, keep only columns present in both dataframes
application_train, application_test = application_train.align(application_test, join = 'inner', axis = 1)

# drop target column
application_train = application_train.drop(columns = ['TARGET'])

In [117]:
from sklearn.preprocessing import MinMaxScaler

# Scale each feature to 0-1
scaler = MinMaxScaler(feature_range = (0, 1))
scaler.fit(application_train)
train = scaler.transform(application_train)
test = scaler.transform(application_test)

In [118]:
model_logr = LogisticRegression()

# Train on the training data
model_logr.fit(train, train_labels)

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


In [119]:
# Make predictions
# Make sure to select the second column only
log_reg_pred = model_logr.predict_proba(test)[:, 1]

In [120]:
# Prediction dataframe
log_reg_pred_df = application_test[['SK_ID_CURR']]
log_reg_pred_df['TARGET'] = log_reg_pred

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  log_reg_pred_df['TARGET'] = log_reg_pred


In [122]:
# Load prediction data to database
try:
    res = log_reg_pred_df.to_sql('home_credit_default_risk_log_reg_pred_df_clean', con=engine, if_exists='replace')
    logging.info(f'success insert data to table: home_credit_default_risk_log_reg_pred_df_clean, inserted {res} data')
except Exception as e:
    logging.info('Failed to insert data to table: log_reg_pred_df')
    logging.error(e)