In [201]:
import os
import sys
from subprocess import Popen, PIPE
import json
import re
import datetime

In [202]:
def setEnv(spark_home):    
    if not spark_home:
     raise ValueError('SPARK_HOME environment variable is not set')
    os.environ['SPARK_HOME']=spark_home
    sys.path.insert(0, os.path.join(spark_home, 'python'))
    sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

In [203]:
def getSC(master, appName='test', cores='10', mem='10g'):
    if not master:
        raise ValueError('master is not set')
    sparkEnv={"spark.cores.max":cores,
              "spark.driver.memory":"5g",
              "spark.executor.memory":mem}
    conf = SparkConf()
    conf.setMaster(master)
    conf.setAppName(appName)
    conf.setAll([(x,sparkEnv[x]) for x in sparkEnv])
    sc = SparkContext(conf=conf)
    sqlContext = HiveContext(sc)
    
    return sc, sqlContext

In [204]:
setEnv('/opt/spark-1.4.3-bin-cdh4')
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf 
from pyspark.sql.types import *
from py4j.java_gateway import Py4JJavaError

In [208]:
sc, sqlContext=getSC('spark://bi-hd03.vpon.idc:7077','hive',cores='80',mem='10g')

## Create a hive table 

In [38]:
sql='select * from ad_tw limit 1'
process = Popen(['hive','-e',sql],stdout=PIPE, stderr=PIPE)

In [39]:
stdout, stderr = process.communicate() 

In [40]:
process.poll()

0

In [51]:
def create_hive_table(sql):
    process = Popen(['hive','-e',sql],stdout=PIPE, stderr=PIPE)
    stdout, stderr = process.communicate() 
    if process.poll() == 0:
        return 0
    else:
        return stderr 

In [61]:
def load_file_hive_table(tablename, path):
    process = Popen(['hive','-e','load data local inpath "%s" into table %s' % (path,tablename)],stdout=PIPE, stderr=PIPE)
    stdout, stderr = process.communicate() 
    if process.poll() == 0:
        return 0
    else:
        return stderr 


In [209]:
def select_hive_table(sql):
    process = Popen(['hive','-e',sql],stdout=PIPE, stderr=PIPE)
    stdout, stderr = process.communicate() 
    if process.poll() == 0:
        return stdout
    else:
        return stderr 
    

In [None]:
import pandas as pd
df = pd.read_csv('/home/bryan/workspace/SparkTutorial/data/titanic.txt', quotechar='"', names=['id','survival','pclass','name','sex','age','sibsp','parch','ticket','fare','cabin','embarked'])
df.to_csv('/home/bryan/workspace/SparkTutorial/data/titanic2.txt', sep=':', header=False, index=Fa)

In [127]:
sql = """create table temp.titanic 
(
id  int,
survival int,
pclass int,
name String,
sex String,
age int,
sibsp int,
parch int,
ticket  String,
fare String,
cabin String,
embarked String
)ROW FORMAT DELIMITED
FIELDS TERMINATED BY ':'
LINES TERMINATED BY '\n'
"""

create_hive_table(sql)

0

In [128]:
load_file_hive_table('temp.titanic', '/home/bryan/workspace/SparkTutorial/data/titanic2.txt')

0

In [129]:
select_hive_table('select * from temp.titanic limit 10')

'1\t0\t3\tBraund, Mr. Owen Harris\tmale\tNULL\t1\t0\tA/5 21171\t7.25\t\tS\n2\t1\t1\tCumings, Mrs. John Bradley (Florence Briggs Thayer)\tfemale\tNULL\t1\t0\tPC 17599\t71.2833\tC85\tC\n3\t1\t3\tHeikkinen, Miss. Laina\tfemale\tNULL\t0\t0\tSTON/O2. 3101282\t7.925\t\tS\n4\t1\t1\tFutrelle, Mrs. Jacques Heath (Lily May Peel)\tfemale\tNULL\t1\t0\t113803\t53.1\tC123\tS\n5\t0\t3\tAllen, Mr. William Henry\tmale\tNULL\t0\t0\t373450\t8.05\t\tS\n6\t0\t3\tMoran, Mr. James\tmale\tNULL\t0\t0\t330877\t8.4583\t\tQ\n7\t0\t1\tMcCarthy, Mr. Timothy J\tmale\tNULL\t0\t0\t17463\t51.8625\tE46\tS\n8\t0\t3\tPalsson, Master. Gosta Leonard\tmale\tNULL\t3\t1\t349909\t21.075\t\tS\n9\t1\t3\tJohnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)\tfemale\tNULL\t0\t2\t347742\t11.1333\t\tS\n10\t1\t2\tNasser, Mrs. Nicholas (Adele Achem)\tfemale\tNULL\t1\t0\t237736\t30.0708\t\tC\n'

## interact with hive 

## Data Process

In [210]:
df = sqlContext.sql("select * from temp.titanic")

In [131]:
df.show()

+--+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|id|survival|pclass|                name|   sex| age|sibsp|parch|          ticket|   fare|cabin|embarked|
+--+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
| 1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25|     |       S|
| 2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
| 3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925|     |       S|
| 4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
| 5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05|     |       S|
| 6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|          330877| 8.4583|     |       Q|
| 7|       0|     1|McCarthy, Mr. Tim...|  mal

In [132]:
df.columns

['id',
 'survival',
 'pclass',
 'name',
 'sex',
 'age',
 'sibsp',
 'parch',
 'ticket',
 'fare',
 'cabin',
 'embarked']

In [135]:
# create dimension table
part1 = sqlContext.sql("select id, survival, pclass from temp.titanic")

In [136]:
part2 = sqlContext.sql("select id, sex, age from temp.titanic")

In [138]:
# register temp table
part1.registerTempTable('part1')
part2.registerTempTable('part2')

In [141]:
# use as a Hive table
sqlContext.sql("select * from part1").show()

+--+--------+------+
|id|survival|pclass|
+--+--------+------+
| 1|       0|     3|
| 2|       1|     1|
| 3|       1|     3|
| 4|       1|     1|
| 5|       0|     3|
| 6|       0|     3|
| 7|       0|     1|
| 8|       0|     3|
| 9|       1|     3|
|10|       1|     2|
|11|       1|     3|
|12|       1|     1|
|13|       0|     3|
|14|       0|     3|
|15|       0|     3|
|16|       1|     2|
|17|       0|     3|
|18|       1|     2|
|19|       0|     3|
|20|       1|     3|
+--+--------+------+



In [143]:
sqlContext.sql("select * from part1 a join part2 b on a.id = b.id ").show()

+---+--------+------+---+------+----+
| id|survival|pclass| id|   sex| age|
+---+--------+------+---+------+----+
|631|       1|     1|631|  male|  80|
|831|       1|     3|831|female|  15|
| 31|       0|     1| 31|  male|  40|
|231|       1|     1|231|female|  35|
|431|       1|     1|431|  male|  28|
|632|       0|     3|632|  male|  51|
|832|       1|     2|832|  male|   0|
| 32|       1|     1| 32|female|null|
|232|       0|     3|232|  male|  29|
|432|       1|     3|432|female|null|
|633|       1|     1|633|  male|  32|
|833|       0|     3|833|  male|null|
| 33|       1|     3| 33|female|null|
|233|       0|     2|233|  male|  59|
|433|       1|     2|433|female|  42|
|634|       0|     1|634|  male|null|
|834|       0|     3|834|  male|  23|
| 34|       0|     2| 34|  male|  66|
|234|       1|     3|234|female|   5|
|434|       0|     3|434|  male|  17|
+---+--------+------+---+------+----+



## use sql as rdd

In [144]:
part1.rdd.take(5)

[Row(id=1, survival=0, pclass=3),
 Row(id=2, survival=1, pclass=1),
 Row(id=3, survival=1, pclass=3),
 Row(id=4, survival=1, pclass=1),
 Row(id=5, survival=0, pclass=3)]

In [145]:
part1.map(lambda x: x[0]).take(5)part

[1, 2, 3, 4, 5]

In [155]:
# RDD operate
part1.map(lambda x: (x[2],1)).reduceByKey(lambda x, y: x+y).collect()

[(2, 184), (1, 216), (3, 491)]

In [158]:
# sql operate
part1.groupby('pclass').count().show()

+------+-----+
|pclass|count|
+------+-----+
|     1|  216|
|     2|  184|
|     3|  491|
+------+-----+



In [166]:
part1.groupBy('pclass').agg({'pclass':'count'}).show()

+------+-------------+
|pclass|COUNT(pclass)|
+------+-------------+
|     1|          216|
|     2|          184|
|     3|          491|
+------+-------------+



In [168]:
part1.agg({'pclass':'count'}).show()

+-------------+
|COUNT(pclass)|
+-------------+
|          891|
+-------------+



In [169]:
part2.agg({'age':'avg'}).show()

+------------------+
|          AVG(age)|
+------------------+
|29.679271708683473|
+------------------+



In [171]:
part2.groupBy('sex').agg({'age':'avg'}).show()

+------+------------------+
|   sex|          AVG(age)|
+------+------------------+
|female|27.904214559386972|
|  male| 30.70198675496689|
+------+------------------+



## RDD to Sql

In [173]:
part2_rdd = part2.rdd

In [174]:
type(part2_rdd)

pyspark.rdd.PipelinedRDD

In [176]:
part2_rdd.take(5)

[Row(id=1, sex=u'male', age=22),
 Row(id=2, sex=u'female', age=38),
 Row(id=3, sex=u'female', age=26),
 Row(id=4, sex=u'female', age=35),
 Row(id=5, sex=u'male', age=35)]

In [178]:
# replace male to 0, female to 1
part2_rdd.map(lambda x: [i for i in x]).take(5)

[[1, u'male', 22],
 [2, u'female', 38],
 [3, u'female', 26],
 [4, u'female', 35],
 [5, u'male', 35]]

In [182]:
coding = {'male':0,
          'female':1}
def recode(x):
    return coding[x]


In [189]:
part2_rdd = part2_rdd.map(lambda x: [x[0], recode(x[1]), x[2]]).take(5)

In [195]:
from pyspark.sql.types import *
     
fields = [StructField('id',IntegerType(),True ), \
          StructField('gender', IntegerType(), True), \
          StructField('age', IntegerType(), True)]
    
schema = StructType(fields)
    
part2_rdd_df = sqlContext.createDataFrame(part2_rdd, schema)

In [197]:
type(part2_rdd_df)

pyspark.sql.dataframe.DataFrame

## User define Function

In [198]:
from pyspark.sql.functions import udf 
sqlContext.registerFunction("recode", lambda x: recode(x), IntegerType())

In [200]:
sqlContext.sql("select sex, recode(sex) from temp.titanic").show()

+------+---+
|   sex|_c1|
+------+---+
|  male|  0|
|female|  1|
|female|  1|
|female|  1|
|  male|  0|
|  male|  0|
|  male|  0|
|  male|  0|
|female|  1|
|female|  1|
|female|  1|
|female|  1|
|  male|  0|
|  male|  0|
|female|  1|
|female|  1|
|  male|  0|
|  male|  0|
|female|  1|
|female|  1|
+------+---+



## save to Hive table

In [211]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- survival: integer (nullable = true)
 |-- pclass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)



In [212]:
sql = """create table temp.titanic2
(
id  int,
survival int,
pclass int,
name String,
sex String,
age int,
sibsp int,
parch int,
ticket  String,
fare String,
cabin String,
embarked String
)ROW FORMAT DELIMITED
FIELDS TERMINATED BY ':'
LINES TERMINATED BY '\n'
"""

create_hive_table(sql)

0

In [215]:
df = df.repartition(10)
sqlContext.sql("use temp")
df.write.insertInto('titanic2')

In [216]:
!hive -e "select * from temp.titanic2 limit 10"

Logging initialized using configuration in file:/etc/hive/conf/hive-log4j.properties
Hive history file=/tmp/bryan/hive_job_log_2c59fb2c-0d09-4d2e-9995-8271c998415f_1412742059.txt
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/camus-example-0.1.0-SNAPSHOT-shaded.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
OK
121	0	2	Hickman, Mr. Stanley George	male	21	2	0	S.O.C. 14879	73.5		S
603	0	1	Harrington, Mr. Charles H	male	NULL	0	0	113796	42.4		S
317	1	2	Kantor, Mrs. Sinai (Miriam Sternin)	female	24	1	0	244367	26.0		S
841	0	3	Alhomaki, Mr. Ilmari Rudolf	male	20	0	0	SOTON/O2 3101287	7.925		S
83	1	3	McDermott, Miss. Brigdet Delia	female	NULL	0	0	330932	7.7875		Q
130	0	3	Ekstrom, Mr. Johan	male	45	0	0	347061	6.975		S
239	0	2	Pengelly, Mr.

In [217]:
sc.stop()