In [None]:
from pyspark import SparkConf, SparkContext
from functools import reduce
import os
os.environ['HADOOP_USER_NAME'] = 'hdfs'

# Modify the line below to change the number of executors and the amount of memory that they use 
# You can run Spark in local mode by changing the '--master' value to 'local', and setting the 'YARN_CONF_DIR' to ''
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local pyspark-shell"

# os.environ['YARN_CONF_DIR'] = '/home/chorus/ChorusCommander/hdfs_configs/2-10.0.0.244'
os.environ['YARN_CONF_DIR'] = ''
# Each worker node in the cluster needs Python 2.7.
# If this is not the default Python on the node, provide the Python path here
# os.environ['PYSPARK_PYTHON'] = '/home/data/opt/cloudera/parcels/Anaconda-4.0.0/bin/python'

# This will stop the SparkContext if there is one left over from a different notebook execution
try:
    sc.stop()
except NameError:
    pass

APP_NAME = 'PysparkDemo.ipynb-sxie'
conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)

In [None]:
# Read file from HDFS [avalanche_cdh5_ha] 
rawRDD = sc.textFile("/data/demo_data/credit_orig.csv").zipWithIndex()

In [None]:
# Parse the CSV: extract header, add a row index column, [assuming all numeric values for data]
parseStringIntoList = lambda s, t: [t-1] + [float(x) for x in s.split(",")]
header = ["RowIdx"] + rawRDD.take(1)[0][0].split(",")
data = rawRDD.filter(lambda p: p[1]>0).map(lambda p: parseStringIntoList(p[0], p[1]))

In [None]:
# Show first few lines of data RDD
data.take(5)

In [None]:
# Setup to create Spark DataFrame and run SparkSQL queries
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
# Create a Spark DataFrame with columns named as per header extracted above
raw_df = sqlContext.createDataFrame(data)
old_columns = raw_df.schema.names
formatted_df = reduce(lambda sparkdf, idx: sparkdf.withColumnRenamed(old_columns[idx], header[idx]), 
                      range(0,len(old_columns)), raw_df)

In [None]:
# Show top 5 rows of DataFrame
formatted_df.show(5)

In [None]:
# Register the DataFrame as a table so we can run SparkSQL queries on it
formatted_df.registerTempTable("credit")

In [None]:
# Run query
result = sqlContext.sql("select * from credit where Column1>60.0")
result.show()

In [None]:
# Setup to create some basic plots
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
get_filtered_data = lambda res, col: res.select(col).rdd.flatMap(list).collect()
filt_col3 = get_filtered_data(result, "Column3")
filt_col4 = get_filtered_data(result, "Column4")

In [None]:
plt.scatter(filt_col3, filt_col4)
plt.show()