# Hive ingestion

Ingest data into Hive tables.

In [None]:
spark.sql('use hackathon')

features = spark.read.csv('/tmp/features.csv', header=True, inferSchema=True)
features.printSchema()
features.write.saveAsTable("hackathon.features")

target = spark.read.csv('/tmp/target.csv', header=True, inferSchema=True)
target.printSchema()
target.write.saveAsTable("hackathon.target")

# Group and Join

Grouping data and then joining with target to produce churn data.

In [2]:
spark.sql('use hackathon')
features = sqlContext.sql('select * from features')
features_colnames = features.schema.names[2:]
features_colnames

['user_account_id',
 'user_lifetime',
 'user_intake',
 'user_no_outgoing_activity_in_days',
 'user_account_balance_last',
 'user_spendings',
 'user_has_outgoing_calls',
 'user_has_outgoing_sms',
 'user_use_gprs',
 'user_does_reload',
 'reloads_inactive_days',
 'reloads_count',
 'reloads_sum',
 'calls_outgoing_count',
 'calls_outgoing_spendings',
 'calls_outgoing_duration',
 'calls_outgoing_spendings_max',
 'calls_outgoing_duration_max',
 'calls_outgoing_inactive_days',
 'calls_outgoing_to_onnet_count',
 'calls_outgoing_to_onnet_spendings',
 'calls_outgoing_to_onnet_duration',
 'calls_outgoing_to_onnet_inactive_days',
 'calls_outgoing_to_offnet_count',
 'calls_outgoing_to_offnet_spendings',
 'calls_outgoing_to_offnet_duration',
 'calls_outgoing_to_offnet_inactive_days',
 'calls_outgoing_to_abroad_count',
 'calls_outgoing_to_abroad_spendings',
 'calls_outgoing_to_abroad_duration',
 'calls_outgoing_to_abroad_inactive_days',
 'sms_outgoing_count',
 'sms_outgoing_spendings',
 'sms_outgoing_

In [3]:
churn = features.groupby('user_account_id').avg()
churn.show(1)

+---------------+---------+----------+--------------------+------------------+----------------+--------------------------------------+------------------------------+-------------------+----------------------------+--------------------------+------------------+---------------------+--------------------------+------------------+----------------+-------------------------+-----------------------------+----------------------------+---------------------------------+--------------------------------+---------------------------------+----------------------------------+--------------------------------------+-------------------------------------+------------------------------------------+-----------------------------------+---------------------------------------+--------------------------------------+-------------------------------------------+-----------------------------------+---------------------------------------+--------------------------------------+----------------------------------------

In [6]:
churn = churn.drop('avg(year)').drop('avg(month)').drop('avg(user_account_id)')

In [5]:
target = sqlContext.sql('select * from target')
target.show(2)

+----+-----+---------------+-----+
|year|month|user_account_id|churn|
+----+-----+---------------+-----+
|2013|    9|         376265|    0|
|2013|    9|        1151965|    0|
+----+-----+---------------+-----+
only showing top 2 rows



In [7]:
churn = churn.join(target, 'user_account_id', 'inner')
churn.show(1)

+---------------+------------------+----------------+--------------------------------------+------------------------------+-------------------+----------------------------+--------------------------+------------------+---------------------+--------------------------+------------------+----------------+-------------------------+-----------------------------+----------------------------+---------------------------------+--------------------------------+---------------------------------+----------------------------------+--------------------------------------+-------------------------------------+------------------------------------------+-----------------------------------+---------------------------------------+--------------------------------------+-------------------------------------------+-----------------------------------+---------------------------------------+--------------------------------------+-------------------------------------------+-----------------------+--------------

In [8]:
grouped_colnames = churn.schema.names
churn = reduce(lambda data, idx: data.withColumnRenamed(grouped_colnames[idx], features_colnames[idx]), xrange(len(features_colnames)), churn)

In [9]:
churn.printSchema()

root
 |-- user_account_id: integer (nullable = true)
 |-- user_lifetime: double (nullable = true)
 |-- user_intake: double (nullable = true)
 |-- user_no_outgoing_activity_in_days: double (nullable = true)
 |-- user_account_balance_last: double (nullable = true)
 |-- user_spendings: double (nullable = true)
 |-- user_has_outgoing_calls: double (nullable = true)
 |-- user_has_outgoing_sms: double (nullable = true)
 |-- user_use_gprs: double (nullable = true)
 |-- user_does_reload: double (nullable = true)
 |-- reloads_inactive_days: double (nullable = true)
 |-- reloads_count: double (nullable = true)
 |-- reloads_sum: double (nullable = true)
 |-- calls_outgoing_count: double (nullable = true)
 |-- calls_outgoing_spendings: double (nullable = true)
 |-- calls_outgoing_duration: double (nullable = true)
 |-- calls_outgoing_spendings_max: double (nullable = true)
 |-- calls_outgoing_duration_max: double (nullable = true)
 |-- calls_outgoing_inactive_days: double (nullable = true)
 |-- ca

In [10]:
sqlContext.sql('drop table churn')
churn.write.saveAsTable('churn')

In [12]:
# test join and group
features.filter(features.user_account_id == 931806)\
  .agg({"user_lifetime": "avg", "user_no_outgoing_activity_in_days": "avg" }).show()
target.filter(target.user_account_id == 931806).show()

+------------------+--------------------------------------+
|avg(user_lifetime)|avg(user_no_outgoing_activity_in_days)|
+------------------+--------------------------------------+
|             572.0|                    2.6666666666666665|
+------------------+--------------------------------------+

+----+-----+---------------+-----+
|year|month|user_account_id|churn|
+----+-----+---------------+-----+
|2013|    9|         931806|    0|
+----+-----+---------------+-----+

