Read in dataset and set curpath to working directory

In [1]:
import sys
import os
import pymysql
import inspect
import pandas as pd
import sqlalchemy as sql

filename = inspect.getframeinfo(inspect.currentframe()).filename
curpath = os.path.dirname(os.path.abspath(filename))

Create Spark Context & SQLContext

In [2]:
# Creating Spark Context
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

# Run the first time:
sc = SparkContext("local")

# Use to rerun script:
# sc = SparkContext.getOrCreate("local")
sqlContext = SQLContext(sc)
spark = SparkSession.builder.getOrCreate()

Load dataset from csv, using the "Expedia Hotel Recommendations" Kaggle dataset: https://www.kaggle.com/c/expedia-hotel-recommendations/overview

In [3]:
dfExpedia = spark.read.load(
  curpath + '/expedia-hotel-recommendations/train.csv',
  format="csv",
  sep=",",
  inferSchema=True,
  header=True
)

# Expose as SQL table
dfExpedia.createOrReplaceTempView('dfExpedia')

In [4]:
dfExpediaSample = dfExpedia.sample(withReplacement=False,fraction=0.014,seed=8)

Preview the schema inferred when the dataset was read in

In [5]:
dfExpediaSample.printSchema()

root
 |-- date_time: timestamp (nullable = true)
 |-- site_name: integer (nullable = true)
 |-- posa_continent: integer (nullable = true)
 |-- user_location_country: integer (nullable = true)
 |-- user_location_region: integer (nullable = true)
 |-- user_location_city: integer (nullable = true)
 |-- orig_destination_distance: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- is_mobile: integer (nullable = true)
 |-- is_package: integer (nullable = true)
 |-- channel: integer (nullable = true)
 |-- srch_ci: timestamp (nullable = true)
 |-- srch_co: timestamp (nullable = true)
 |-- srch_adults_cnt: integer (nullable = true)
 |-- srch_children_cnt: integer (nullable = true)
 |-- srch_rm_cnt: integer (nullable = true)
 |-- srch_destination_id: integer (nullable = true)
 |-- srch_destination_type_id: integer (nullable = true)
 |-- is_booking: integer (nullable = true)
 |-- cnt: integer (nullable = true)
 |-- hotel_continent: integer (nullable = true)
 |-- hotel_country: 

In MySQL Workbench, create a table of the same schema

In [6]:
# DROP DATABASE IF EXISTS `expedia`;
# CREATE DATABASE `expedia`; 
# USE `expedia`;

# CREATE TABLE train (date_time TEXT);
# ALTER TABLE train ADD COLUMN site_name TEXT;
# ALTER TABLE train ADD COLUMN posa_continent TEXT;
# ALTER TABLE train ADD COLUMN user_location_country TEXT;
# ALTER TABLE train ADD COLUMN user_location_region TEXT;
# ALTER TABLE train ADD COLUMN user_location_city TEXT;
# ALTER TABLE train ADD COLUMN orig_destination_distance TEXT;
# ALTER TABLE train ADD COLUMN user_id TEXT;
# ALTER TABLE train ADD COLUMN is_mobile TEXT;
# ALTER TABLE train ADD COLUMN is_package TEXT;
# ALTER TABLE train ADD COLUMN `channel` TEXT;
# ALTER TABLE train ADD COLUMN srch_ci TEXT;
# ALTER TABLE train ADD COLUMN srch_co TEXT;
# ALTER TABLE train ADD COLUMN srch_adults_cnt TEXT;
# ALTER TABLE train ADD COLUMN srch_children_cnt TEXT;
# ALTER TABLE train ADD COLUMN srch_rm_cnt TEXT;
# ALTER TABLE train ADD COLUMN srch_destination_id TEXT;
# ALTER TABLE train ADD COLUMN srch_destination_type_id TEXT;
# ALTER TABLE train ADD COLUMN is_booking TEXT;
# ALTER TABLE train ADD COLUMN cnt TEXT;
# ALTER TABLE train ADD COLUMN hotel_continent TEXT;
# ALTER TABLE train ADD COLUMN hotel_country TEXT;
# ALTER TABLE train ADD COLUMN hotel_market TEXT;
# ALTER TABLE train ADD COLUMN hotel_cluster TEXT;

Convert Spark DataFrame to Pandas DataFrame

In [7]:
dfExpediaPandas = dfExpediaSample.toPandas()

In [8]:
dfExpediaPandas.head(8)

Unnamed: 0,date_time,site_name,posa_continent,user_location_country,user_location_region,user_location_city,orig_destination_distance,user_id,is_mobile,is_package,...,srch_children_cnt,srch_rm_cnt,srch_destination_id,srch_destination_type_id,is_booking,cnt,hotel_continent,hotel_country,hotel_market,hotel_cluster
0,2014-12-25 03:03:12,24,2,3,64,9448,,2451,1,0,...,0,1,8785,1,0,1,6,105,35,2
1,2014-12-25 15:32:48,24,2,3,51,9527,,2451,1,0,...,0,1,8785,1,0,1,6,105,35,29
2,2014-04-19 09:15:46,2,3,66,174,16634,3.3379,3313,0,0,...,2,1,61531,6,0,1,2,50,1241,91
3,2014-05-07 14:23:23,2,3,66,189,6881,,3925,0,0,...,0,1,12004,1,0,3,2,50,480,32
4,2014-09-20 15:54:18,2,3,66,189,22336,,3925,1,0,...,0,1,26242,6,0,1,2,198,391,83
5,2014-07-09 12:06:45,25,2,23,48,4924,,3972,1,0,...,1,1,12216,6,0,1,2,50,365,72
6,2014-07-10 01:21:52,25,2,23,48,4924,,3972,1,0,...,1,1,8278,1,0,1,2,50,368,41
7,2014-07-10 23:43:22,25,2,23,48,4924,,3972,1,0,...,1,1,12266,6,0,1,2,50,368,72


In [11]:
len(dfExpediaPandas)

526520

Connect to MySQL database and insert dataset

In [9]:
import sqlalchemy as sql

# Create connection variables
DB_USER = os.environ.get('DB_USER')
DB_PASS = os.environ.get('DB_PASS')
DB_HOST = 'localhost'
DB_PORT = 3306
DATABASE = 'expedia'

# Connect to the database for pymysql connection
connection = pymysql.connect(host=DB_HOST,
                             user=os.environ.get('DB_USER'),
                             password=os.environ.get('DB_PASS'),
                             db=DATABASE,
                             charset='utf8mb4',
                             cursorclass=pymysql.cursors.DictCursor)

# Create connection string for sqlalchemy
connect_string = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'\
                    .format(DB_USER, DB_PASS, DB_HOST, DB_PORT, DATABASE)

# To setup the persistent connection, you do the following:
sql_engine = sql.create_engine(connect_string)

# Ensue the features are in the appropriate order to be inserted into the database
dfExpediaPandas[['date_time'
    , 'site_name'
    , 'posa_continent'
    , 'user_location_country'
    , 'user_location_region'
    , 'user_location_city'
    , 'orig_destination_distance'
    , 'user_id'
    , 'is_mobile'
    , 'is_package'
    , 'channel'
    , 'srch_ci'
    , 'srch_co'
    , 'srch_adults_cnt'
    , 'srch_children_cnt'
    , 'srch_rm_cnt'
    , 'srch_destination_id'
    , 'srch_destination_type_id'
    , 'is_booking'
    , 'cnt'
    , 'hotel_continent'
    , 'hotel_country'
    , 'hotel_market'
    , 'hotel_cluster']]

for i in range(len(dfExpediaPandas)):
    with connection.cursor() as cursor:
        # Create a new record
        sql = "INSERT INTO `train` (`date_time`\
                                            , `site_name`\
                                            , `posa_continent`\
                                            , `user_location_country`\
                                            , `user_location_region`\
                                            , `user_location_city`\
                                            , `orig_destination_distance`\
                                            , `user_id`\
                                            , `is_mobile`\
                                            , `is_package`\
                                            , `channel`\
                                            , `srch_ci`\
                                            , `srch_co`\
                                            , `srch_adults_cnt`\
                                            , `srch_children_cnt`\
                                            , `srch_rm_cnt`\
                                            , `srch_destination_id`\
                                            , `srch_destination_type_id`\
                                            , `is_booking`\
                                            , `cnt`\
                                            , `hotel_continent`\
                                            , `hotel_country`\
                                            , `hotel_market`\
                                            , `hotel_cluster`) VALUES (%s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s\
                                                            , %s)"
        cursor.execute(sql, (str(dfExpediaPandas.iloc[i][0])\
                             , str(dfExpediaPandas.iloc[i][1])\
                             , str(dfExpediaPandas.iloc[i][2])\
                             , str(dfExpediaPandas.iloc[i][3])\
                             , str(dfExpediaPandas.iloc[i][4])\
                             , str(dfExpediaPandas.iloc[i][5])\
                             , str(dfExpediaPandas.iloc[i][6])\
                             , str(dfExpediaPandas.iloc[i][7])\
                             , str(dfExpediaPandas.iloc[i][8])\
                             , str(dfExpediaPandas.iloc[i][9])\
                             , str(dfExpediaPandas.iloc[i][10])\
                             , str(dfExpediaPandas.iloc[i][11])\
                             , str(dfExpediaPandas.iloc[i][12])\
                             , str(dfExpediaPandas.iloc[i][13])\
                             , str(dfExpediaPandas.iloc[i][14])\
                             , str(dfExpediaPandas.iloc[i][15])\
                             , str(dfExpediaPandas.iloc[i][16])\
                             , str(dfExpediaPandas.iloc[i][17])\
                             , str(dfExpediaPandas.iloc[i][18])\
                             , str(dfExpediaPandas.iloc[i][19])\
                             , str(dfExpediaPandas.iloc[i][20])\
                             , str(dfExpediaPandas.iloc[i][21])\
                             , str(dfExpediaPandas.iloc[i][22])\
                             , str(dfExpediaPandas.iloc[i][23])))

    # connection will not autocommit by default, so need to commit the changes to the database
    connection.commit()