## ETL: Add local timezone to each song played

In [1]:
import os
import lxml

import pandas as pd

#### Use maximum width of notebook

In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

pd.set_option('display.height', 1000)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', 800)
pd.set_option('display.max_colwidth', 500)
pd.set_option('expand_frame_repr', True)

In [None]:
!whoami

In [None]:
os.chdir("/data_data/session_length/")
!pwd

In [None]:
!env

### Make sure no other spark jobs running before doing this

In [None]:
#import pymongo
#import pymongo_spark

#pymongo_spark.activate()


In [None]:
#sc.stop()
#spark.stop()

### If you want to restart Yarn etc

In [None]:
!export HADOOP_HOME=/usr/local/hadoop 
!export PATH=$PATH:/usr/local/hadoop/bin:/usr/local/hadoop/sbin

In [None]:
#!/usr/local/hadoop/sbin/stop-all.sh
#!/usr/local/hadoop/sbin/start-all.sh

In [None]:
APP_NAME = "ETL Add Local TimeZone and is_holiday flag"

try:
    sc.stop()
    spark.stop()
except:
    pass

from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

try:
    sc and spark
except (NameError, UnboundLocalError) as e:
    import findspark
    #findspark.init()
    import pyspark
    import pyspark.sql
    
    #sc = pyspark.SparkContext()
    #spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()
    
sc = SparkContext(conf = SparkConf() .set("spark.sql.autoBroadcastJoinThreshold",-1) .set("spark.driver.maxResultSize", "40g") .set("spark.sql.execution.arrow.enabled", "true") .set('spark.sql.broadcastTimeout', 1000) .set('spark.local.dir', '/data_data/session_length/spark_tmp/') .set('spark.driver.memory', '80G') .set("spark.executor.instances", "20") .set("spark.executor.cores", 20) .set("spark.executor.memory", "12G")).getOrCreate()
spark = SparkSession(sc)
spark.sparkContext.setLogLevel("ERROR")
    

### Check configuration of Spark Environment

In [None]:
sc._conf.getAll()

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, LongType, DateType, DoubleType, IntegerType
from pyspark.sql.functions import count, mean, stddev_pop, min, max, lit, round, bround, pow, col, corr, lower, upper, avg, stddev, abs, log
from pyspark.sql.functions import lit, trim, rtrim, rpad, trim, coalesce
from pyspark.sql.functions import current_date, current_timestamp, date_add, date_sub, months_between, to_date
from pyspark.sql.functions import udf, col, sum
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, dense_rank, rank, expr

from pyspark.ml import Pipeline
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import LogisticRegression, GBTClassifier, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from datetime import datetime

### Import Data

#### Import utility tools

#### Import both data sets

In [None]:
!head data/lastFM/lastfm-dataset-1K/userid-profile.tsv

In [None]:
users_df = None

In [None]:
from_pattern = 'MMM dd, yyyy'
to_pattern = 'yyyy-MM-dd'

def import_user_data():
    global users_df
    print("==================================================================================================================")
    print("======================================== IMPORTING USERID-PROFILE.CSV  ======================================")
    print("==================================================================================================================\n\n")
    userSchema = StructType([\
        StructField('userid',  StringType(), True),\
        StructField('gender',  StringType(), True),\
        StructField('age',     IntegerType(), True),\
        StructField('country', StringType(), True),\
        StructField('registered', StringType(), True)])
    users = spark.read.format('csv').schema(userSchema).option("sep","\t").load('/data_data/session_length/data/lastFM/lastfm-dataset-1K/userid-profile.tsv')
    users.show(5,False)
    #func = udf(lambda x: datetime.strptime(x, to_pattern), DateType() )
    users_df = users.withColumn('reg_date', to_date(col("registered"), from_pattern)).na.drop(subset=["country"])
    users_df.show(5,False)

In [None]:
import_user_data()

#### Import music_data

In [None]:
!head data/lastFM/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv

In [None]:
sessions_DF = None
sessions_df = None

In [None]:
from_pattern = 'yyyy-MM-ddTHH:mm:ssZ'
to_pattern = 'yyyy-MM-dd'

def import_session_data():
    global sessions_DF
    global sessions_df
    print("====================================================================================================================")
    print("====================================userid-timestamp-artid-artname-traid-traname.tsv================================")
    print("==================================================================================================================\n\n")
    userSchema = StructType([\
        StructField('userid',  StringType(), True),\
        StructField('timestamp',  StringType(), True),\
        StructField('artid',     StringType(), True),\
        StructField('artname_', StringType(), True),\
        StructField('traid', StringType(), True),\
        StructField('traname_', StringType(), True)
        ])
    sessions = spark.read.format('csv').schema(userSchema).option("sep","\t").load('/data_data/session_length/data/lastFM/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv')
    sessions.show(5,False)
    #Assign window to time (using hour as window....)
    def get_window(x):
        return int(str(x).split("T")[1].split(":")[0])
    getWindow = udf(lambda timestamp:  get_window(timestamp))
    sessions_DF_ = sessions.withColumn('session_date',to_date(col('timestamp'))).withColumn('session_window', getWindow(col('timestamp')))
    sessions_DF  = sessions_DF_.drop('artid','artname_','traid','traname_')
    sessions_DF.show(5,False)
    #Translate artname_ to English
    def translate_to_english(str_to_translate):
        return gs.translate(str_to_translate, 'en')
    func = udf(lambda str_to_translate:  translate_to_english(str_to_translate), StringType())
    #sessions_df = sessions_DF.withColumn('artname', translate_to_english('artname_'))
    #sessions_df[["artname_"]].apply(lambda row: np.mean(row),axis=0)
    #sessions_df["artname"] = sessions_DF["artname_"].apply(lambda name: translate_to_english(name),axis=0)
    #.withColumn('traname', func(col('traname_')))
    #sessions_df = sessions_DF.rdd.map(lambda x: (x["artname_"]))
    #print(sessions_df.take(5))

In [None]:
!whoami

In [None]:
import_session_data()

In [None]:
#from pyspark.sql.types import Row

#row = Row("artname_")
#df_sessions = sessions_df.map(row).toDF()
#schema = StructType([StructField("artname", StringType(), True)])
#df_sessions = sessions_df.toDF(["artname"],)
#[gs.translate(x,'en') for x in df_sessions]
#df_sessions = sessions_df.flatMap(gs.translate)
#df_sessions.take(5)

In [None]:
users_df.show()

In [None]:
users_df.filter("userid = 'user_000004'").show(20)

#### Denormalize by joining user and session databases

In [None]:
users_df = users_df.withColumnRenamed('userid','user_id')
u = users_df.alias('u')
s = sessions_DF.alias('s')
join_condition = [ (u.user_id == s.userid) ]
sessionsDF = s.join(u, join_condition, 'inner').drop('user_id')
sessionsDF.show(20,False)

#### Save as TSV file for Time Series analysis

#### Uncomment if you want a fresh copy of the file

In [None]:
#!rm -R /data_data/session_length/data/tmp
#sessionsDF.coalesce(1).write.option("delimiter", "\t").csv('/data_data/session_length/data/tmp', header="True")
#!mv /data_data/session_length/data/tmp/part-* /data_data/session_length/data/Session_data.csv

In [None]:
!head /data_data/session_length/data/Session_data.csv

### Get Local time by converting from UTC to Local [WORK IN PROGRESS]

In [None]:
from dateutil import tz
import pytz

### Use countryInfo from here: https://gist.githubusercontent.com/pamelafox/986163/raw/f5f9db4f1b287804fd07ffb3296ed0036292bc7a/countryinfo.py

In [None]:
countries_info = [
{'timezones': ['Europe/Albania'], 'code': 'AD', 'continent': 'Europe', 'name': 'Albania', 'capital': 'Skopje'},
{'timezones': ['Europe/Madrid'], 'code': 'AD', 'continent': 'Europe', 'name': 'Andorra', 'capital': 'Madrid'},
{'timezones': ['Asia/Kabul'], 'code': 'AF', 'continent': 'Asia', 'name': 'Afghanistan', 'capital': 'Kabul'},
{'timezones': ['America/Antigua'], 'code': 'AG', 'continent': 'America', 'name': 'Antigua and Barbuda', 'capital': "Antigua"},
{'timezones': ['Europe/Tirane'], 'code': 'AL', 'continent': 'Europe', 'name': 'Albania', 'capital': 'Tirana'},
{'timezones': ['Asia/Yerevan'], 'code': 'AM', 'continent': 'Asia', 'name': 'Armenia', 'capital': 'Yerevan'},
{'timezones': ['Africa/Luanda'], 'code': 'AO', 'continent': 'Africa', 'name': 'Angola', 'capital': 'Luanda'},
{'timezones': ['America/Argentina/Buenos_Aires', 'America/Argentina/Cordoba', 'America/Argentina/Jujuy', 'America/Argentina/Tucuman', 'America/Argentina/Catamarca', 'America/Argentina/La_Rioja', 'America/Argentina/San_Juan', 'America/Argentina/Mendoza', 'America/Argentina/Rio_Gallegos', 'America/Argentina/Ushuaia'], 'code': 'AR', 'continent': 'America', 'name': 'Argentina', 'capital': 'Buenos Aires'},
{'timezones': ['Europe/Vienna'], 'code': 'AT', 'continent': 'Europe', 'name': 'Austria', 'capital': 'Vienna'},
{'timezones': ['Australia/Sydney'],'code':'AU', 'continent': 'Australia', 'name':'Australia', 'capital':'Sydney'},
{'timezones': ['Asia/Baku'], 'code': 'AZ', 'continent': 'Asia', 'name': 'Azerbaijan', 'capital': 'Baku'},
{'timezones': ['America/Barbados'], 'code': 'BB', 'continent': 'America', 'name': 'Barbados', 'capital': 'Barbados'},
{'timezones': ['Asia/Dhaka'], 'code': 'BD', 'continent': 'Asia', 'name': 'Bangladesh', 'capital': 'Dhaka'},
{'timezones': ['Europe/Brussels'], 'code': 'BE', 'continent': 'Europe', 'name': 'Belgium', 'capital': 'Brussels'},
{'timezones': ['Africa/Ouagadougou'], 'code': 'BF', 'continent': 'Africa', 'name': 'Burkina Faso', 'capital': 'Ouagadougou'},
{'timezones': ['Europe/Sofia'], 'code': 'BG', 'continent': 'Europe', 'name': 'Bulgaria', 'capital': 'Sofia'},
{'timezones': ['Asia/Bahrain'], 'code': 'BH', 'continent': 'Asia', 'name': 'Bahrain', 'capital': 'Bahrain'},
{'timezones': ['Africa/Bujumbura'], 'code': 'BI', 'continent': 'Africa', 'name': 'Burundi', 'capital': 'Bujumbura'},
{'timezones': ['Africa/Porto-Novo'], 'code': 'BJ', 'continent': 'Africa', 'name': 'Benin', 'capital': 'Porto-Novo'},
{'timezones': ['Asia/Brunei'], 'code': 'BN', 'continent': 'Asia', 'name': 'Brunei Darussalam', 'capital': 'Brunei'},
{'timezones': ['America/La_Paz'], 'code': 'BO', 'continent': 'America', 'name': 'Bolivia', 'capital': 'La_Paz'},
{'timezones': ['America/Sao_Paulo'], 'code': 'BR', 'continent': 'America', 'name': 'Brazil', 'capital': 'Sao_Paulo'},
{'timezones': ['America/Nassau'], 'code': 'BS', 'continent': 'America', 'name': 'Bahamas', 'capital': 'Nassau'},
{'timezones': ['Asia/Thimphu'], 'code': 'BT', 'continent': 'Asia', 'name': 'Bhutan', 'capital': 'Thimphu'},
{'timezones': ['Africa/Gaborone'], 'code': 'BW', 'continent': 'Africa', 'name': 'Botswana', 'capital': 'Gaborone'},
{'timezones': ['Europe/Minsk'], 'code': 'BY', 'continent': 'Europe', 'name': 'Belarus', 'capital': 'Minsk'},
{'timezones': ['America/Belize'], 'code': 'BZ', 'continent': 'America', 'name': 'Belize', 'capital': 'Belize'},
{'timezones': ['Canada/Eastern'], 'code': 'CA', 'continent': 'Canada', 'name': 'Canada', 'capital': 'Eastern'},
{'timezones': ['Africa/Kinshasa', 'Africa/Lubumbashi'], 'code': 'CD', 'continent': 'Africa', 'name': 'Democratic Republic of the Congo', 'capital': 'Kinshasa'},
{'timezones': ['Africa/Brazzaville'], 'code': 'CG', 'continent': 'Africa', 'name': 'Republic of the Congo', 'capital': 'Brazzaville'},
{'timezones': ['Africa/Abidjan'], 'code': 'CI', 'continent': 'Africa', 'name': "Cote d'Ivoire", 'capital': 'Abidjan'},
{'timezones': ['Africa/Abidjan'], 'code': 'CI', 'continent': 'Africa', 'name': "C\\u00f4te d'Ivoire", 'capital': 'Abidjan'},
{'timezones': ['America/Santiago', 'Pacific/Easter'], 'code': 'CL', 'continent': 'America', 'name': 'Chile', 'capital': 'Santiago'},
{'timezones': ['Africa/Douala'], 'code': 'CM', 'continent': 'Africa', 'name': 'Cameroon', 'capital': 'Lagos'},
{'timezones': ['Asia/Shanghai', 'Asia/Harbin', 'Asia/Chongqing', 'Asia/Urumqi', 'Asia/Kashgar'], 'code': 'CN', 'continent': 'Asia', 'name': "People's Republic of China", 'capital': 'Shanghai'},
{'timezones': ['Asia/Shanghai', 'Asia/Harbin', 'Asia/Chongqing', 'Asia/Urumqi', 'Asia/Kashgar'], 'code': 'CN', 'continent': 'Asia', 'name': "China", 'capital': 'Shanghai'},
{'timezones': ['America/Bogota'], 'code': 'CO', 'continent': 'America', 'name': 'Colombia', 'capital': 'Bogota'},
{'timezones': ['America/Costa_Rica'], 'code': 'CR', 'continent': 'America', 'name': 'Costa Rica', 'capital': 'Costa_Rica'},
{'timezones': ['America/Havana'], 'code': 'CU', 'continent': 'America', 'name': 'Cuba', 'capital': 'Havana'},
{'timezones': ['Atlantic/Cape_Verde'], 'code': 'CV', 'continent': 'Atlantic', 'name': 'Cape Verde', 'capital': 'Cape_Verde'},
{'timezones': ['Asia/Nicosia'], 'code': 'CY', 'continent': 'Asia', 'name': 'Cyprus', 'capital': 'Nicosia'},
{'timezones': ['Europe/Prague'], 'code': 'CZ', 'continent': 'Europe', 'name': 'Czech Republic', 'capital': 'Prague'},
{'timezones': ['Europe/Berlin'], 'code': 'DE', 'continent': 'Europe', 'name': 'Germany', 'capital': 'Berlin'},
{'timezones': ['Africa/Djibouti'], 'code': 'DJ', 'continent': 'Africa', 'name': 'Djibouti', 'capital': 'Djibouti City'},
{'timezones': ['Europe/Copenhagen'], 'code': 'DK', 'continent': 'Europe', 'name': 'Denmark', 'capital': 'Copenhagen'},
{'timezones': ['America/Dominica'], 'code': 'DM', 'continent': 'America', 'name': 'Dominica', 'capital': 'Roseau'},
{'timezones': ['America/Santo_Domingo'], 'code': 'DO', 'continent': 'America', 'name': 'Dominican Republic', 'capital': 'Dominica'},
{'timezones': ['America/Guayaquil', 'Pacific/Galapagos'], 'code': 'EC', 'continent': 'America', 'name': 'Ecuador', 'capital': 'Guatemala'},
{'timezones': ['Europe/Tallinn'], 'code': 'EE', 'continent': 'Europe', 'name': 'Estonia', 'capital': 'Tallinn'},
{'timezones': ['Africa/Cairo'], 'code': 'EG', 'continent': 'Africa', 'name': 'Egypt', 'capital': 'Cairo'},
{'timezones': ['Africa/Asmera'], 'code': 'ER', 'continent': 'Africa', 'name': 'Eritrea', 'capital': 'Asmara'},
{'timezones': ['Africa/Addis_Ababa'], 'code': 'ET', 'continent': 'Africa', 'name': 'Ethiopia', 'capital': 'Addis Ababa'},
{'timezones': ['Europe/Helsinki'], 'code': 'FI', 'continent': 'Europe', 'name': 'Finland', 'capital': 'Helsinki'},
{'timezones': ['Pacific/Fiji'], 'code': 'FJ', 'continent': 'Pacific', 'name': 'Fiji', 'capital': 'Fiji'},
{'timezones': ['Europe/Paris'], 'code': 'FR', 'continent': 'Europe', 'name': 'France', 'capital': 'Paris'},
{'timezones': ['Africa/Libreville'], 'code': 'GA', 'continent': 'Africa', 'name': 'Gabon', 'capital': 'Libreville'},
{'timezones': ['Asia/Tbilisi'], 'code': 'GE', 'continent': 'Asia', 'name': 'Georgia', 'capital': 'Tbilisi'},
{'timezones': ['Africa/Accra'], 'code': 'GH', 'continent': 'Africa', 'name': 'Ghana', 'capital': 'Accra'},
{'timezones': ['Africa/Banjul'], 'code': 'GM', 'continent': 'Africa', 'name': 'The Gambia', 'capital': 'Banjul'},
{'timezones': ['Africa/Conakry'], 'code': 'GN', 'continent': 'Africa', 'name': 'Guinea', 'capital': 'Conakry'},
{'timezones': ['Europe/Athens'], 'code': 'GR', 'continent': 'Europe', 'name': 'Greece', 'capital': 'Athens'},
{'timezones': ['America/Guatemala'], 'code': 'GT', 'continent': 'America', 'name': 'Guatemala', 'capital': 'Guatemala'},
{'timezones': ['America/Guatemala'], 'code': 'GT', 'continent': 'America', 'name': 'Haiti', 'capital': 'Port-au-Prince'},
{'timezones': ['Africa/Bissau'], 'code': 'GW', 'continent': 'Africa', 'name': 'Guinea-Bissau', 'capital': 'Bissau'},
{'timezones': ['America/Guyana'], 'code': 'GY', 'continent': 'America', 'name': 'Guyana', 'capital': 'Georgetown'},
{'timezones': ['America/Tegucigalpa'], 'code': 'HN', 'continent': 'America', 'name': 'Honduras', 'capital': 'Tegucigalpa'},
{'timezones': ['Europe/Budapest'], 'code': 'HU', 'continent': 'Europe', 'name': 'Hungary', 'capital': 'Budapest'},
{'timezones': ['Asia/Jakarta', 'Asia/Pontianak', 'Asia/Makassar', 'Asia/Jayapura'], 'code': 'ID', 'continent': 'Asia', 'name': 'Indonesia', 'capital': 'Jakarta'},
{'timezones': ['Europe/Dublin'], 'code': 'IE', 'continent': 'Europe', 'name': 'Republic of Ireland', 'capital': 'Dublin'},
{'timezones': ['Asia/Jerusalem'], 'code': 'IL', 'continent': 'Asia', 'name': 'Israel', 'capital': 'Jerusalem'},
{'timezones': ['Asia/Calcutta'], 'code': 'IN', 'continent': 'Asia', 'name': 'India', 'capital': 'Calcutta'},
{'timezones': ['Asia/Baghdad'], 'code': 'IQ', 'continent': 'Asia', 'name': 'Iraq', 'capital': 'Baghdad'},
{'timezones': ['Asia/Tehran'], 'code': 'IR', 'continent': 'Asia', 'name': 'Iran', 'capital': 'Tehran'},
{'timezones': ['Atlantic/Reykjavik'], 'code': 'IS', 'continent': 'Europe', 'name': 'Iceland', 'capital': 'Iceland'},
{'timezones': ['Europe/Rome'], 'code': 'IT', 'continent': 'Europe', 'name': 'Italy', 'capital': 'Rome'},
{'timezones': ['America/Jamaica'], 'code': 'JM', 'continent': 'America', 'name': 'Jamaica', 'capital': 'St_Thomas'},
{'timezones': ['Asia/Amman'], 'code': 'JO', 'continent': 'Asia', 'name': 'Jordan', 'capital': 'Amman'},
{'timezones': ['Asia/Tokyo'], 'code': 'JP', 'continent': 'Asia', 'name': 'Japan', 'capital': 'Tokyo'},
{'timezones': ['Africa/Nairobi'], 'code': 'KE', 'continent': 'Africa', 'name': 'Kenya', 'capital': 'Nairobi'},
{'timezones': ['Asia/Bishkek'], 'code': 'KG', 'continent': 'Asia', 'name': 'Kyrgyzstan', 'capital': 'Bishkek'},
{'timezones': ['Pacific/Tarawa', 'Pacific/Enderbury', 'Pacific/Kiritimati'], 'code': 'KI', 'continent': 'Oceania', 'name': 'Kiribati', 'capital': 'Tarawa'},
{'timezones': ['Asia/Pyongyang'], 'code': 'KP', 'continent': 'Asia', 'name': 'North Korea', 'capital': 'Pyongyang'},
{'timezones': ['Asia/Seoul'], 'code': 'KR', 'continent': 'Asia', 'name': 'South Korea', 'capital': 'Seoul'},
{'timezones': ['Asia/Kuwait'], 'code': 'KW', 'continent': 'Asia', 'name': 'Kuwait', 'capital': 'Kuwait'},
{'timezones': ['Asia/Beirut'], 'code': 'LB', 'continent': 'Asia', 'name': 'Lebanon', 'capital': 'Beirut'},
{'timezones': ['Europe/Vaduz'], 'code': 'LI', 'continent': 'Europe', 'name': 'Liechtenstein', 'capital': 'Vaduz'},
{'timezones': ['Africa/Monrovia'], 'code': 'LR', 'continent': 'Africa', 'name': 'Liberia', 'capital': 'Monrovia'},
{'timezones': ['Africa/Maseru'], 'code': 'LS', 'continent': 'Africa', 'name': 'Lesotho', 'capital': 'Maseru'},
{'timezones': ['Europe/Vilnius'], 'code': 'LT', 'continent': 'Europe', 'name': 'Lithuania', 'capital': 'Vilnius'},
{'timezones': ['Europe/Luxembourg'], 'code': 'LU', 'continent': 'Europe', 'name': 'Luxembourg', 'capital': 'Luxembourg City'},
{'timezones': ['Europe/Riga'], 'code': 'LV', 'continent': 'Europe', 'name': 'Latvia', 'capital': 'Riga'},
{'timezones': ['Africa/Tripoli'], 'code': 'LY', 'continent': 'Africa', 'name': 'Libya', 'capital': 'Tripoli'},
{'timezones': ['Indian/Antananarivo'], 'code': 'MG', 'continent': 'Indian', 'name': 'Madagascar', 'capital': 'Antananarivo'},
{'timezones': ['Pacific/Majuro', 'Pacific/Kwajalein'], 'code': 'MH', 'continent': 'Oceania', 'name': 'Marshall Islands', 'capital': 'Majuro'},
{'timezones': ['Europe/Skopje'], 'code': 'MK', 'continent': 'Europe', 'name': 'Macedonia', 'capital': 'Skopje'},
{'timezones': ['Africa/Bamako'], 'code': 'ML', 'continent': 'Africa', 'name': 'Mali', 'capital': 'Bamako'},
{'timezones': ['Asia/Rangoon'], 'code': 'MM', 'continent': 'Asia', 'name': 'Myanmar', 'capital': 'Naypyidaw'},
{'timezones': ['Asia/Ulaanbaatar', 'Asia/Hovd', 'Asia/Choibalsan'], 'code': 'MN', 'continent': 'Asia', 'name': 'Mongolia', 'capital': 'Ulaanbaatar'},
{'timezones': ['Africa/Nouakchott'], 'code': 'MR', 'continent': 'Africa', 'name': 'Mauritania', 'capital': 'Nouakchott'},
{'timezones': ['Europe/Malta'], 'code': 'MT', 'continent': 'Europe', 'name': 'Malta', 'capital': 'Valletta'},
{'timezones': ['Indian/Mauritius'], 'code': 'MU', 'continent': 'Africa', 'name': 'Mauritius', 'capital': 'Port Louis'},
{'timezones': ['Indian/Maldives'], 'code': 'MV', 'continent': 'Asia', 'name': 'Maldives', 'capital': 'Male'},
{'timezones': ['Africa/Blantyre'], 'code': 'MW', 'continent': 'Africa', 'name': 'Malawi', 'capital': 'Lusaka'},
{'timezones': ['America/Mexico_City', 'America/Cancun', 'America/Merida', 'America/Monterrey', 'America/Mazatlan', 'America/Chihuahua', 'America/Hermosillo', 'America/Tijuana'], 'code': 'MX', 'continent': 'America', 'name': 'Mexico', 'capital': 'Mexico City'},
{'timezones': ['Asia/Kuala_Lumpur', 'Asia/Kuching'], 'code': 'MY', 'continent': 'Asia', 'name': 'Malaysia', 'capital': 'Kuala Lumpur'},
{'timezones': ['Africa/Maputo'], 'code': 'MZ', 'continent': 'Africa', 'name': 'Mozambique', 'capital': 'Maputo'},
{'timezones': ['Africa/Windhoek'], 'code': 'NA', 'continent': 'Africa', 'name': 'Namibia', 'capital': 'Windhoek'},
{'timezones': ['Africa/Niamey'], 'code': 'NE', 'continent': 'Africa', 'name': 'Niger', 'capital': 'Niamey'},
{'timezones': ['Africa/Lagos'], 'code': 'NG', 'continent': 'Africa', 'name': 'Nigeria', 'capital': 'Lagos'},
{'timezones': ['America/Managua'], 'code': 'NI', 'continent': 'America', 'name': 'Nicaragua', 'capital': 'Managua'},
{'timezones': ['Europe/Amsterdam'], 'code': 'NL', 'continent': 'Europe', 'name': 'Kingdom of the Netherlands', 'capital': 'Amsterdam'},
{'timezones': ['Europe/Oslo'], 'code': 'NO', 'continent': 'Europe', 'name': 'Norway', 'capital': 'Oslo'},
{'timezones': ['Asia/Katmandu'], 'code': 'NP', 'continent': 'Asia', 'name': 'Nepal', 'capital': 'Kathmandu'},
{'timezones': ['Pacific/Nauru'], 'code': 'NR', 'continent': 'Oceania', 'name': 'Nauru', 'capital': 'Yaren'},
{'timezones': ['Pacific/Auckland', 'Pacific/Chatham'], 'code': 'NZ', 'continent': 'Oceania', 'name': 'New Zealand', 'capital': 'Auckland'},
{'timezones': ['Asia/Muscat'], 'code': 'OM', 'continent': 'Asia', 'name': 'Oman', 'capital': 'Muscat'},
{'timezones': ['America/Panama'], 'code': 'PA', 'continent': 'America', 'name': 'Panama', 'capital': 'Panama'},
{'timezones': ['America/Lima'], 'code': 'PE', 'continent': 'America', 'name': 'Peru', 'capital': 'Lima'},
{'timezones': ['Pacific/Port_Moresby'], 'code': 'PG', 'continent': 'Oceania', 'name': 'Papua New Guinea', 'capital': 'Port_Moresby'},
{'timezones': ['Asia/Manila'], 'code': 'PH', 'continent': 'Asia', 'name': 'Philippines', 'capital': 'Manila'},
{'timezones': ['Asia/Karachi'], 'code': 'PK', 'continent': 'Asia', 'name': 'Pakistan', 'capital': 'Karachi'},
{'timezones': ['Europe/Warsaw'], 'code': 'PL', 'continent': 'Europe', 'name': 'Poland', 'capital': 'Warsaw'},
{'timezones': ['Europe/Lisbon', 'Atlantic/Madeira', 'Atlantic/Azores'], 'code': 'PT', 'continent': 'Europe', 'name': 'Portugal', 'capital': 'Lisbon'},
{'timezones': ['Pacific/Palau'], 'code': 'PW', 'continent': 'Oceania', 'name': 'Palau', 'capital': 'Ngerulmud'},
{'timezones': ['America/Asuncion'], 'code': 'PY', 'continent': 'America', 'name': 'Paraguay', 'capital': 'Asuncicion'},
{'timezones': ['Asia/Qatar'], 'code': 'QA', 'continent': 'Asia', 'name': 'Qatar', 'capital': 'Qatar'},
{'timezones': ['Europe/Bucharest'], 'code': 'RO', 'continent': 'Europe', 'name': 'Romania', 'capital': 'Bucharest'},
{'timezones': ['Europe/Kaliningrad', 'Europe/Moscow'], 'code': 'RU', 'continent': 'Europe', 'name': 'Russia', 'capital': 'Moscow'},
{'timezones': ['Europe/Kaliningrad', 'Europe/Moscow'], 'code': 'RU', 'continent': 'Europe', 'name': 'Russian Federation', 'capital': 'Moscow'},

{'timezones': ['Africa/Kigali'], 'code': 'RW', 'continent': 'Africa', 'name': 'Rwanda', 'capital': 'Kigali'},
{'timezones': ['Asia/Riyadh'], 'code': 'SA', 'continent': 'Asia', 'name': 'Saudi Arabia', 'capital': 'Riyadh'},
{'timezones': ['Pacific/Guadalcanal'], 'code': 'SB', 'continent': 'Oceania', 'name': 'Solomon Islands', 'capital': 'Honiara'},
{'timezones': ['Indian/Mahe'], 'code': 'SC', 'continent': 'Africa', 'name': 'Seychelles', 'capital': 'Victoria'},
{'timezones': ['Africa/Khartoum'], 'code': 'SD', 'continent': 'Africa', 'name': 'Sudan', 'capital': 'Khartoum'},
{'timezones': ['Europe/Stockholm'], 'code': 'SE', 'continent': 'Europe', 'name': 'Sweden', 'capital': 'Stockholm'},
{'timezones': ['Asia/Singapore'], 'code': 'SG', 'continent': 'Asia', 'name': 'Singapore', 'capital': 'Singapore'},
{'timezones': ['Europe/Ljubljana'], 'code': 'SI', 'continent': 'Europe', 'name': 'Slovenia', 'capital': 'Ljubljana'},
{'timezones': ['Europe/Bratislava'], 'code': 'SK', 'continent': 'Europe', 'name': 'Slovakia', 'capital': 'Bratislava'},
{'timezones': ['Africa/Freetown'], 'code': 'SL', 'continent': 'Africa', 'name': 'Sierra Leone', 'capital': 'Freetown'},
{'timezones': ['Europe/San_Marino'], 'code': 'SM', 'continent': 'Europe', 'name': 'San Marino', 'capital': 'San Marino'},
{'timezones': ['Africa/Dakar'], 'code': 'SN', 'continent': 'Africa', 'name': 'Senegal', 'capital': 'Dakar'},
{'timezones': ['Africa/Mogadishu'], 'code': 'SO', 'continent': 'Africa', 'name': 'Somalia', 'capital': 'Mogadishu'},
{'timezones': ['America/Paramaribo'], 'code': 'SR', 'continent': 'America', 'name': 'Suriname', 'capital': 'Paramaribo'},
{'timezones': ['Africa/Sao_Tome'], 'code': 'ST', 'continent': 'Africa', 'name': 'Sao Tome and Principe', 'capital': 'Sao Tome'},
{'timezones': ['Asia/Damascus'], 'code': 'SY', 'continent': 'Asia', 'name': 'Syria', 'capital': 'Damascus'},
{'timezones': ['Africa/Lome'], 'code': 'TG', 'continent': 'Africa', 'name': 'Togo', 'capital': 'Lome'},
{'timezones': ['Asia/Bangkok'], 'code': 'TH', 'continent': 'Asia', 'name': 'Thailand', 'capital': 'Bangkok'},
{'timezones': ['Asia/Dushanbe'], 'code': 'TJ', 'continent': 'Asia', 'name': 'Tajikistan', 'capital': 'Dushanbe'},
{'timezones': ['Asia/Ashgabat'], 'code': 'TM', 'continent': 'Asia', 'name': 'Turkmenistan', 'capital': 'Ashgabat'},
{'timezones': ['Africa/Tunis'], 'code': 'TN', 'continent': 'Africa', 'name': 'Tunisia', 'capital': 'Tunis'},
{'timezones': ['Pacific/Tongatapu'], 'code': 'TO', 'continent': 'Oceania', 'name': 'Tonga', 'capital': 'Nukualofa'},
{'timezones': ['Europe/Istanbul'], 'code': 'TR', 'continent': 'Asia', 'name': 'Turkey', 'capital': 'Istanbul'},
{'timezones': ['America/Port_of_Spain'], 'code': 'TT', 'continent': 'America', 'name': 'Trinidad and Tobago', 'capital': 'Port of Spain'},
{'timezones': ['Pacific/Funafuti'], 'code': 'TV', 'continent': 'Oceania', 'name': 'Tuvalu', 'capital': 'Funafuti'},
{'timezones': ['Africa/Dar_es_Salaam'], 'code': 'TZ', 'continent': 'Africa', 'name': 'Tanzania', 'capital': 'Dar_es_Salam'},
{'timezones': ['Europe/Kiev', 'Europe/Uzhgorod', 'Europe/Zaporozhye', 'Europe/Simferopol'], 'code': 'UA', 'continent': 'Europe', 'name': 'Ukraine', 'capital': 'Kiev'},
{'timezones': ['Africa/Kampala'], 'code': 'UG', 'continent': 'Africa', 'name': 'Uganda', 'capital': 'Kampala'},
{'timezones': ['America/Nome'], 'code': 'US', 'continent': 'US', 'name': 'United States', 'capital': 'Central'},
{'timezones': ['America/Montevideo'], 'code': 'UY', 'continent': 'America', 'name': 'Uruguay', 'capital': 'Montevideo'},
{'timezones': ['Asia/Samarkand', 'Asia/Tashkent'], 'code': 'UZ', 'continent': 'Asia', 'name': 'Uzbekistan', 'capital': 'Tashkent'},
{'timezones': ['Europe/Vatican'], 'code': 'VA', 'continent': 'Europe', 'name': 'Vatican City', 'capital': 'Vatican City'},
{'timezones': ['America/Caracas'], 'code': 'VE', 'continent': 'America', 'name': 'Venezuela', 'capital': 'Caracas'},
{'timezones': ['Asia/Saigon'], 'code': 'VN', 'continent': 'Asia', 'name': 'Vietnam', 'capital': 'Phnom_Penh'},
{'timezones': ['Pacific/Efate'], 'code': 'VU', 'continent': 'Oceania', 'name': 'Vanuatu', 'capital': 'Port Vila'},
{'timezones': ['Asia/Aden'], 'code': 'YE', 'continent': 'Asia', 'name': 'Yemen', 'capital': "Aden"},
{'timezones': ['Africa/Lusaka'], 'code': 'ZM', 'continent': 'Africa', 'name': 'Zambia', 'capital': 'Lusaka'},
{'timezones': ['Africa/Harare'], 'code': 'ZW', 'continent': 'Africa', 'name': 'Zimbabwe', 'capital': 'Harare'},
{'timezones': ['Africa/Algiers'], 'code': 'DZ', 'continent': 'Africa', 'name': 'Algeria', 'capital': 'Algiers'},
{'timezones': ['Europe/Sarajevo'], 'code': 'BA', 'continent': 'Europe', 'name': 'Bosnia and Herzegovina', 'capital': 'Sarajevo'},
{'timezones': ['Asia/Phnom_Penh'], 'code': 'KH', 'continent': 'Asia', 'name': 'Cambodia', 'capital': 'Phnom Penh'},
{'timezones': ['Africa/Bangui'], 'code': 'CF', 'continent': 'Africa', 'name': 'Central African Republic', 'capital': 'Bangui'},
{'timezones': ['Africa/Ndjamena'], 'code': 'TD', 'continent': 'Africa', 'name': 'Chad', 'capital': "Ndjamena"},
{'timezones': ['Indian/Comoro'], 'code': 'KM', 'continent': 'Africa', 'name': 'Comoros', 'capital': 'Moroni'},
{'timezones': ['Europe/Zagreb'], 'code': 'HR', 'continent': 'Europe', 'name': 'Croatia', 'capital': 'Zagreb'},
{'timezones': ['Asia/Dili'], 'code': 'TL', 'continent': 'Asia', 'name': 'East Timor', 'capital': 'Dili'},
{'timezones': ['America/El_Salvador'], 'code': 'SV', 'continent': 'America', 'name': 'El Salvador', 'capital': 'Guatemala'},
{'timezones': ['Africa/Malabo'], 'code': 'GQ', 'continent': 'Africa', 'name': 'Equatorial Guinea', 'capital': 'Malabo'},
{'timezones': ['America/Grenada'], 'code': 'GD', 'continent': 'America', 'name': 'Grenada', 'capital': "St. George's"},
{'timezones': ['Asia/Almaty', 'Asia/Qyzylorda', 'Asia/Aqtobe', 'Asia/Aqtau', 'Asia/Oral'], 'code': 'KZ', 'continent': 'Asia', 'name': 'Kazakhstan', 'capital': 'Tashkent'},
{'timezones': ['Asia/Vientiane'], 'code': 'LA', 'continent': 'Asia', 'name': 'Laos', 'capital': 'Vientiane'},
{'timezones': ['Pacific/Truk', 'Pacific/Ponape', 'Pacific/Kosrae'], 'code': 'FM', 'continent': 'Oceania', 'name': 'Federated States of Micronesia', 'capital': 'Palikir'},
{'timezones': ['Europe/Chisinau'], 'code': 'MD', 'continent': 'Europe', 'name': 'Moldova', 'capital': 'Chisinau'},
{'timezones': ['Europe/Monaco'], 'code': 'MC', 'continent': 'Europe', 'name': 'Monaco', 'capital': 'Monaco'},
{'timezones': ['Europe/Podgorica'], 'code': 'ME', 'continent': 'Europe', 'name': 'Montenegro', 'capital': 'Podgorica'},
{'timezones': ['Africa/Casablanca'], 'code': 'MA', 'continent': 'Africa', 'name': 'Morocco', 'capital': 'Lagos'},
{'timezones': ['America/St_Kitts'], 'code': 'KN', 'continent': 'America', 'name': 'Saint Kitts and Nevis', 'capital': 'Basseterre'},
{'timezones': ['America/St_Lucia'], 'code': 'LC', 'continent': 'America', 'name': 'Saint Lucia', 'capital': 'Castries'},
{'timezones': ['America/St_Vincent'], 'code': 'VC', 'continent': 'America', 'name': 'Saint Vincent and the Grenadines', 'capital': 'Kingstown'},
{'timezones': ['Pacific/Apia'], 'code': 'WS', 'continent': 'Pacific', 'name': 'Samoa', 'capital': 'Samoa'},
{'timezones': ['Europe/Belgrade'], 'code': 'RS', 'continent': 'Europe', 'name': 'Serbia', 'capital': 'Belgrade'},
{'timezones': ['Africa/Johannesburg'], 'code': 'ZA', 'continent': 'Africa', 'name': 'South Africa', 'capital': 'Johannesburg'},
{'timezones': ['Europe/Madrid', 'Africa/Ceuta', 'Atlantic/Canary'], 'code': 'ES', 'continent': 'Europe', 'name': 'Spain', 'capital': 'Madrid'},
{'timezones': ['Asia/Colombo'], 'code': 'LK', 'continent': 'Asia', 'name': 'Sri Lanka', 'capital': 'Sri Jayewardenepura Kotte'},
{'timezones': ['Africa/Mbabane'], 'code': 'SZ', 'continent': 'Africa', 'name': 'Swaziland', 'capital': 'Mbabane'},
{'timezones': ['Europe/Zurich'], 'code': 'CH', 'continent': 'Europe', 'name': 'Switzerland', 'capital': 'Zurich'},
{'timezones': ['Asia/Dubai'], 'code': 'AE', 'continent': 'Asia', 'name': 'United Arab Emirates', 'capital': 'Dubai'},
{'timezones': ['Europe/London'], 'code': 'GB', 'continent': 'Europe', 'name': 'United Kingdom', 'capital': 'London'},
]

In [None]:
country_capital_city_tz = dict()
[country_capital_city_tz.update({name['name']:name['continent'].split()[-1]+"/"+"_".join(name['capital'].split(" ")).replace(",","")}) for name in countries_info]

In [None]:
country_capital_city_tz['Turkey']

In [None]:
country_capital_city_tz['Malawi']

In [None]:
country_capital_city_tz['United States']

In [None]:
tz_exceptions = {'Albania': 'Europe/Skopje',
 'Andorra': 'Europe/Madrid',
 'Antigua and Barbuda': 'America/Antigua',
 'Australia': 'Australia/Sydney',
 'Bahrain': 'Asia/Bahrain',
 'Barbados': 'America/Barbados',
 'Belize': 'America/Belize',
 'Bolivia': 'America/La_Paz',
 'Brazil': 'America/Sao_Paulo',
 'Brunei Darussalam': 'Asia/Brunei',
 "C\\u00f4te d'Ivoire": 'Africa/Abidjan',
 'Cameroon': 'Africa/Lagos',
 'Canada': 'Canada/Eastern',
 'Cape Verde': 'Atlantic/Cape_Verde',
 'Chad': 'Africa/Ndjamena',
 'Comoros': 'Indian/Comoro',
 'Costa Rica': 'America/Costa_Rica',
 'Djibouti': 'Africa/Djibouti',
 'Dominica': 'America/Dominica',
 'Ecuador': 'America/Guatemala',
 'El Salvador': 'America/Guatemala',
 'Federated States of Micronesia': 'US/Hawaii',
 'Fiji': 'Pacific/Fiji',
 'Grenada': 'America/Grenada',
 'Guatemala': 'America/Guatemala',
 'Guyana': 'America/Guyana',
 'Iceland': 'Iceland',
 'India': 'Asia/Calcutta',
 'Jamaica': 'America/St_Thomas',
 'Kazakhstan': 'Asia/Tashkent',
 'Kiribati': 'Pacific/Kiritimati',
 'Kuwait': 'Asia/Kuwait',
 'Luxembourg': 'Europe/Luxembourg',
 'Madagascar': 'Indian/Antananarivo',
 'Malawi': 'Africa/Lusaka',
 'Maldives': 'Indian/Maldives',
 'Malta': 'Europe/Malta',
 'Marshall Islands': '',
 'Mauritius': 'Indian/Mauritius',
 'Morocco': 'Africa/Lagos',
 'Myanmar': 'Asia/Rangoon',
 'Nauru': 'Pacific/Nauru',
 'New Zealand': 'Pacific/Auckland',
 'Nigeria': 'Africa/Lagos',
 'Pakistan': 'Asia/Karachi',
 'Palau': 'Pacific/Palau',
 'Panama': 'America/Panama',
 'Papua New Guinea': 'Pacific/Port_Moresby',
 'Paraguay': 'America/Asuncion',
 "People's Republic of China": 'Asia/Shanghai',
 'Qatar': 'Asia/Qatar',
 'Saint Kitts and Nevis': 'America/St_Kitts',
 'Saint Lucia': 'America/St_Lucia',
 'Saint Vincent and the Grenadines': 'America/St_Vincent',
 'Samoa': 'Pacific/Samoa',
 'Seychelles': 'Indian/Mauritius',
 'Solomon Islands': 'Pacific/Midway',
 'South Africa': 'Africa/Johannesburg',
 'Sri Lanka': 'Asia/Calcutta',
 'Switzerland': 'Europe/Zurich',
 'Tanzania': 'Africa/Dar_es_Salaam',
 'Tonga': 'Pacific/Tongatapu',
 'Turkey': 'Asia/Istanbul',
 'Tuvalu': 'Pacific/Funafuti',
 'United Arab Emirates': 'Asia/Dubai',
 'United States': 'US/Central',
 'Vanuatu': 'Pacific/Honolulu',
 'Vatican City': 'Europe/Vatican',
 'Vietnam': 'Asia/Phnom_Penh',
 'Yemen': 'Asia/Aden'}

In [None]:
for country in tz_exceptions.keys():
    country_capital_city_tz[country] = tz_exceptions[country]

In [None]:
country_capital_city_tz['Turkey']

In [None]:
country_capital_city_tz['Malawi']

In [None]:
import json
with open('country_capital_city_tz.json', 'w') as file:
     file.write(json.dumps(country_capital_city_tz))

In [None]:
!head country_capital_city_tz.json

### Add TimeZone information for each session )

### SWITCHING TO Map Reduce

In [None]:
import ast
str_ = "'\\xe5\\x9d\\x82\\xe6\\x9c\\xac\\xe9\\xbe\\x8d\\xe4\\xb8\\x80'"
str_[1:-1]

In [None]:
%%writefile MRTranslate.py

import os

import mrjob
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob import step
from mrjob.protocol import RawProtocol

from collections import defaultdict, Counter

import re
import sys
import json

import datetime
from datetime import datetime
from dateutil import parser
from dateutil import tz
import pytz

import ast

#import goslate 
#import translate
from googletrans import Translator

import requests


class MRTranslate(MRJob):

    def __init__(self, *args, **kwargs):
        super(MRTranslate, self).__init__(*args, **kwargs)
        self.min_line_length = 12
        self.translator   = Translator()
        self.day_of_week  = {0:"Monday",1:"Tuesday",2:"Wednesday",3:"Thursday",4:"Friday",5:"Saturday",6:"Sunday"}
        self.time_of_day  = {"Night":[0,1,2,3,4], "Dawn":[5,6],"Morning":[7,8,9],"LateMorning":[10,11], "Lunch":[12,13],"Afternoon":[14,15,16],"LateAfternoon":[17,18],"Evening":[19,20,21],"LateEvening":[22,23]}
        self.exceptions   = ['Russian Federation', 'Russia', 'Turkey', 'Netherlands', 'China', 'Peoples Republic of China']
    
#    def configure_options(self):
#        super(MRTranslate, self).configure_options()
#        self.add_file_option('--tzFile', default='/user/w210/data/country_capital_city_tz.json', dest='tzFile')
#        self.add_passthru_arg('--num-mappers',  default=10, type="int",  dest="num_mappers",  help="Number of mappers")
#        self.add_passthru_arg('--num-reducers', default=10, type="int",  dest="num_reducers", help="Number of reducers")
#        self.add_passthru_arg('--local_dir',    default='/user/hduser',  dest="local_dir",    help="Working directory for user")
#        self.add_passthru_arg('--tmp_dir',      default='/user/hduser',  dest="tmp_dir",      help="Temp directory for user")                                    
        
    def configure_args(self):
        super(MRTranslate, self).configure_args()
        self.add_file_arg('--tzFile', default='/data_data/session_length/country_capital_city_tz.json', dest='tzFile')
        self.add_passthru_arg('--num_mappers',  default=10,  dest="num_mappers",  help="Number of mappers")
        self.add_passthru_arg('--num_reducers', default=10,  dest="num_reducers", help="Number of reducers")
        self.add_passthru_arg('--local_dir',    default='/user/hduser',  dest="local_dir",  help="Working directory for user")
        self.add_passthru_arg('--tmp_dir',      default='/user/hduser',  dest="tmp_dir",    help="Temp directory for user")
        
    def translate(self, text):
        try:
            result = self.translator.translate(text)
        except:
            result = ""
        return result
    
    def utc_to_local(self, utc_timestamp, local_tz):
        try:
            local_dt = utc_timestamp.replace(tzinfo=pytz.utc).astimezone(local_tz)
            #sys.stderr.write("LOCAL_DT:"+repr(local_dt)+"\n")
            result = local_tz.normalize(local_dt)
            #sys.stderr.write("RESULT ====> "+repr(result)+"\n")
        except:
            result = None
        return result
    
    def get_time_of_day(self, hour):
        for key in self.time_of_day.keys():
            if hour in self.time_of_day[key]:
                return key

    def mapper_translate(self, _, line_):
        
        tzFile = str(self.options.tzFile) 
        
        tz_dict = dict()
        
        #sys.stderr.write("About to OPEN tzFILE"+"\n")
        
        with open(tzFile,'r') as f:
            
            #sys.stderr.write("OPENED tzFILE"+"\n")
            
            lines     = f.readlines()
            
            tz_dict   = ast.literal_eval(lines[0])
    
        if len(line_) > self.min_line_length:
            
            line      = line_.split("\t")
            user_id   = list(line)[0]
            value     = [v.replace('"','').strip() for v in line[1:]]
            utc_time  = value[0][:-1]
            traid     = value[1]
            try:
                #artname   = self.translator.translate(value[2]).text
                artname   = value[2]
            except:
                artname   = value[2]
            something = value[3]
            try:
                #tracname  = self.translator.translate(value[4]).text
                tracname  = value[4]
            except:
                tracname  = value[4]
            session_window = value[5]
            gender    = value[6]
            age       = value[7]
            country   = value[8]
            registered= value[9] 
            reg_date  = value[10]
                
            
            if country == "":
                country = "United States"
            
            try:
                local_tz  = pytz.timezone(tz_dict[country])
            except:
                local_tz  = None
            try:
                utc    = datetime.strptime(utc_time, '%Y-%m-%dT%H:%M:%S')
            except: 
                try:
                    utc    = datetime.strptime(utc_time, '%Y-%m-%dT%H:%M')
                except: 
                    if user_id == "userid":
                        pass
                    else:
                        sys.stderr.write(" UTC Exception:\t" + repr(str(utc_time)) + " COUNTRY:  "    + repr(str(country)) +"\n")
                        utc    = None
                
                utc    = None
                
            try:
                event_time = self.utc_to_local(utc, local_tz)
            except: 
                event_time = None
                    
            try:
                time_diff  = event_time.utcoffset()
                local_time = event_time + time_diff
            except:
                local_time = None
                
            try:
                day_of_week = self.day_of_week[local_time.weekday()]
            except:
                day_of_week = None
                    
            try:
                time_of_day = self.get_time_of_day(local_time.hour)
            except:
                time_of_day = None
                    
            #sys.stderr.write("\n" + " utc_time: " + repr(str(utc_time)) + " traid:  "    + repr(str(traid))     + " artname: "     + repr(str(artname)) +"\n")
            #sys.stderr.write(" artname:  " + repr(str(artname))  + " something: " + repr(str(something)) +  "\n")
            #sys.stderr.write(" tracname: " + repr(str(tracname)) + " session_window: " + repr(str(session_window)) + " gender: " + repr(str(gender)) + "\n")
            #sys.stderr.write(" age: "      + repr(str(age))      + " country:"   + repr(str(country))    + " registered: "  + repr(str(registered))  + "\n")
            
            #if not time_of_day:
            #    sys.stderr.write("\n reg_date: " + repr(str(reg_date)) + " local_tz: " + repr(str(local_tz))   + " local_time: "  + str(local_time) + " time_of_day: " + str(time_of_day) + " day_of_week: " +  str(day_of_week)+"\n")
            #    sys.stderr.write("\n============================================================================================================================\n")

                    
            value_ = []
                
            for x in [utc_time, traid, artname, something, tracname, session_window, gender, age, country, registered, reg_date, local_tz, local_time, time_of_day, day_of_week]:
                try:
                    value_.append(repr(x))
                except:
                    value_.append("")
                        
            #sys.stderr.write("\t".join([repr(x) for x in value_]))
            
            if user_id == "userid":
                pass
            else:
                try:
                    yield user_id, value_

                except:
                    yield user_id, None
        
        




    def steps(self):
        
        JOBCONF = {
            'mapreduce.job.maps': self.options.num_mappers,
            'mapreduce.job.reduces': self.options.num_reducers,
            'mapreduce.partition.keypartitioner.options': '-k1',
            'mapreduce.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
            'mapreduce.text.key.comparator.options': '-k1',
            'mapreduce.map.memory.mb':40960,
            'mapreduce.reduce.memory.mb':40960,
            'mapreduce.map.java.opts':'-Xmx30720m',
            'mapreduce.reduce.java.opts':'-Xmx61440m'
        }
        JOBCONF1 = {
            'mapreduce.job.maps': self.options.num_mappers,
            'mapred.reduce.tasks': self.options.num_reducers,
            'mapreduce.partition.keypartitioner.options': '-k1',
            'mapreduce.output.key.comparator.class': 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
            'mapreduce.text.key.comparator.options': '-k1',
            'mapreduce.map.memory.mb':40960,
            'mapreduce.reduce.memory.mb':40960,
            'mapreduce.map.java.opts':'-Xmx30720m',
            'mapreduce.reduce.java.opts':'-Xmx61440m'
        }
        
        return [MRStep(mapper=self.mapper_translate,
                       jobconf=JOBCONF)]
       

if __name__ == '__main__':
    MRTranslate.run()

In [None]:
source_file = '/data_data/session_length/data/Session_data.csv'

In [None]:
!tail {source_file}

In [None]:
import MRTranslate
reload(MRTranslate)

temp_list = []

def myJob(source_file):
    mr_job = MRTranslate.MRTranslate(args=[source_file])
    with mr_job.make_runner() as runner:
        runner.run()
        for line in runner.stream_output():
            key, value = mr_job.parse_output_line(line)
            temp_list.append([key, value])
        return temp_list

In [None]:
output_list = myJob(source_file)
output_ALL_df   = pd.Series(output_list)

In [None]:
output_ALL_df.head()

In [None]:
!rm session_with_timezone_u4.csv

In [None]:
output_ALL_df.to_csv("session_with_timezone_ALL.csv", sep='@' , encoding='utf-8')

In [None]:
!head session_with_timezone_u4.csv

In [None]:
!cat data/Session_data_100k.csv | grep user_000004

In [None]:
!tail data/Session_data_u4.csv

In [None]:
!python synNet/EDA/num_nodes.py -r hadoop "/media/notebooks/HW7/synnet/synnet.txt" --output-dir={NUM_NODES_OUT} --no-output

#    -cmdenv tzFile='country_capital_city_tz.json' \
#    -cmdenv PATH={PATH}


In [None]:
!rm /data_data/session_length/session_data_1k.csv
!rm -R /tmp/MRtranslate*
!python MRTranslate.py /data_data/session_length/data/Session_data_1k.csv   --tzFile=/data_data/session_length/country_capital_city_tz.json --tmp_dir=/data_data/session_length/hadoop/tmp  > session_data_1k_is_holiday.tsv

In [None]:
!head session_data_1k_is_holiday.tsv

In [None]:
!head -n 5 data/Session_data_1k.csv

In [None]:
!ls -alrth  data/Session_data_*

In [None]:
!export PATH=$PATH:/usr/local/hadoop/bin:/usr/local/hadoop/sbin

In [None]:
!export HADOOP_opts="-Dhadoop.tmp.dir=/path/"

In [None]:
TRANSLATE_PY ='/data_data/session_length/MRTranslate.py'

In [None]:
JAR_FILE = '/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.0.1.jar'

In [None]:
USER = !whoami
USER = 'w210'
OUTPUT_PATH_BASE = '/user/{USER}'.format(USER=USER)

In [None]:
OUTPUT_DIR = "{OUTPUT_PATH_BASE}/data/output" 

In [None]:
TEST_1k = "{OUTPUT_PATH_BASE}/data/Session_data_1k.csv".format(OUTPUT_PATH_BASE=OUTPUT_PATH_BASE)
TEST_10k = "{OUTPUT_PATH_BASE}/data/Session_data_10k.csv".format(OUTPUT_PATH_BASE=OUTPUT_PATH_BASE)
TEST_100k = "{OUTPUT_PATH_BASE}/data/Session_data_100k.csv".format(OUTPUT_PATH_BASE=OUTPUT_PATH_BASE)
TEST_1M = "{OUTPUT_PATH_BASE}/data/Session_data_1M.csv".format(OUTPUT_PATH_BASE=OUTPUT_PATH_BASE)
FULL_DATA = "{OUTPUT_PATH_BASE}/data/Session_data.csv".format(OUTPUT_PATH_BASE=OUTPUT_PATH_BASE)

In [None]:
!hdfs dfs -chgrp hadoop /user/w210/data/Session*

In [None]:
!hdfs dfs -ls /user/w210/data/Session*

In [None]:
!echo {TEST_1k}

In [None]:
!hdfs dfs -ls {TEST_1k}

In [None]:
!ls -alrth country_capital*

In [None]:
TZ_FILE='/user/w210/data/country_capital_city_tz.json'

In [None]:
INPUT_FILE='/user/w210/data/Session_data_1M.csv'

In [None]:
TMP_DIR='/user/hduser/tmp'

In [None]:
!ls -alrth data

In [None]:
#!hdfs dfs -rm /user/hduser/Session_data*.csv
#!hdfs dfs -copyFromLocal data/Session_data*.csv /user/hduser/
#!hdfs dfs -ls /user/hduser

In [None]:
!hdfs dfs -rm -r /user/hduser/output*
!hdfs dfs -ls /user/hduser

In [None]:
#!hdfs dfs -rm -r /user/hduser/output_all
#!hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.0.1.jar \
#    -D mapreduce.job.maps=10     \
#    -D hadoop.tmp.dir='/data_data/session_length/hadoop/tmp' \
#    -files MRTranslate.py,country_capital_city_tz.json\
#    -input '/user/hduser/Session_data.csv' \
#    -output '/user/hduser/output_all' \
#    -numReduceTasks 10 \
#    -cmdenv tzFile='country_capital_city_tz.json' \
#    -cmdenv PATH={PATH}

In [None]:
!hdfs dfs -ls /user/hduser/output_all/ 

In [None]:
!hdfs dfs -tail /user/hduser/output_all/part-00001

In [None]:
!head data/Session_data.csv

In [None]:
#!hdfs fs -rm -r /user/hduser/output_5
#!hdfs jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.0.1.jar     \
#    -D mapreduce.job.maps=10     \
#    -D hadoop.tmp.dir='/data_data/session_length/hadoop/tmp' \
#    -files MRTranslate.py,country_capital_city_tz.json    \
#    -input '/user/hduser/Session_data.csv'  \
#    -output '/user/hduser/output_5'     \
#    -numReduceTasks 10     \
#    -cmdenv tzFile='country_capital_city_tz.json'     \
#    -cmdenv PATH={PATH}

In [None]:
!hdfs dfs -ls /user/hduser/ 

In [None]:
!hdfs dfs -tail /user/hduser/output_5/part-00000

In [None]:
!head session_data_1K_is_holiday.tsv

### Save as file (via toPandas())

In [None]:
session_with_holiday_DF = spark.createDataFrame(output_ALL_df)

In [None]:
session_with_holiday_DF.show()

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
session_with_holiday_DF.toPandas().to_csv("session_with_timezone.csv", sep='\t', encoding='utf-8',index=False)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
!head session_with_holiday_all.csv