# Shared Data Context

The intent of this notebook is to provide examples of how data scientists can use object storage, and more specifically, Ceph object storage, much in the same way they are accoustomed to interacting with Amazon Simple Storage Service (S3). This is made possible because Ceph's object storage gateway offers excellent fidelity with the modalities of Amazon S3.

# Working with Boto

Boto is an integrated interface to current and future infrastructural services offered by Amazon Web Services. Amoung the services it provides interfaces for is Amazon S3. For lightweight analysis of data using python tools like numpy or pandas, it is handy to interact with data stored in object storage using pure python. This is where Boto shines. The base-notebook from [radanalyticsio](https://radanalytics.io) doesn't include Boto, but you can install it from the comfort of a notebook using the conda install command below. If you find yourself using Boto frequently, it might be worth modifying [base-notebook](https://github.com/radanalyticsio/base-notebook) and building a custom notebook image that includes Boto.

In [None]:
import sys
!conda install --yes --quiet --prefix {sys.prefix} boto3

In [None]:
import os
import boto3

s3 = boto3.client('s3','us-east-1', endpoint_url= os.environ['RGW_API_ENDPOINT'],
                       aws_access_key_id = os.environ['RGW_USER_USER_KEY'],
                       aws_secret_access_key = os.environ['RGW_USER_SECRET_KEY'])


Creating a bucket, uploading and object (put), and listing the bucket.

In [None]:
s3.create_bucket(Bucket='ceph-bucket')
s3.put_object(Bucket='ceph-bucket',Key='object',Body='data')
for key in s3.list_objects(Bucket='ceph-bucket')['Contents']:
    print(key['Key'])

# Working with Spark

When running an application 

# TODO: explain local vs oshinko clusters

In [None]:
import os
import pyspark

from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext

spark = SparkSession.builder.master("local[3]").getOrCreate()

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.endpoint", os.environ['RGW_API_ENDPOINT'])
hadoopConf.set("fs.s3a.access.key", os.environ['RGW_USER_USER_KEY'])
hadoopConf.set("fs.s3a.secret.key", os.environ['RGW_USER_SECRET_KEY'])
hadoopConf.set("fs.s3a.path.style.access", "true")
hadoopConf.set("fs.s3a.connection.ssl.enabled", "false")

In [None]:
import socket
spark.range(100, numPartitions=100).rdd.map(lambda x: socket.gethostname()).distinct().collect()

In [None]:
df0 = spark.read.text("s3a://ceph-bucket/object")

In [None]:
df0

# Working with Public Cloud

As of Hadoop 2.8, S3A supports per bucket configuration. This is very powerful. It allows us to have a distinct S3A configuration, with a different endpoint and different set of credentials. With this I can use a single Spark context to read a parquet file from a bucket in the public cloud (Amazon S3) into a data frame, then turn around and write that dataframe as a parquet file into a bucket that exists in the Ceph Nano service running in Minishift.

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.bucket.radanalytics-data.endpoint", "s3.amazonaws.com")
hadoopConf.set("fs.s3a.bucket.radanalytics-data.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

In [None]:
df1 = spark.read.parquet("s3a://radanalytics-data/wikieod.parquet")

In [None]:
df1.write.parquet("s3a://ceph-bucket/wikieod.parquet")

# ETL Example

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.bucket.bd-dist.endpoint", "s3.amazonaws.com")
hadoopConf.set("fs.s3a.bucket.bd-dist.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

Simply read tab separated data from a bucket in Amazon S3 and write it back out to a bucket in our Ceph Nano service.

In [None]:
spark.read.csv("s3a://bd-dist/trip_report.tsv",sep="\t").write.csv("s3a://ceph-bucket/trip_report.tsv",sep="\t")

Extract tab separated data from a bucket in Amazon S3 and reserialize it as parquet as we're writing it out to a bucket in our Ceph Nano service.

In [None]:
spark.read.csv("s3a://bd-dist/trip_report.tsv",sep="\t").write.parquet("s3a://ceph-bucket/trip_report.parquet")

# Working with SparkSQL

In [None]:
df1.schema.jsonValue()

In [None]:
df1.count()

In [None]:
df1.registerTempTable("wikieod")

In [None]:
rht = spark.sql("select * from wikieod where ticker = 'RHT'")

In [None]:
rht.show()

In [None]:
%matplotlib inline
import seaborn as sns
_ = sns.tsplot(rht.sort(rht.date).toPandas().close)

# Working with Local TensorFlow

We'll start by installing TensorFlow, along with several other machine learning libraries that we will need for our machine learning example.

In [None]:
import sys
!{sys.executable} -m pip install sklearn tensorflow keras pandas matplotlib seaborn

__Access the data using Spark__

We already established the Spark Context above, so we can use it to load the trip report from the TSV object we wrote into our Ceph Nano service.

In [None]:
feedbackFile = spark.read.csv("s3a://ceph-bucket/trip_report.tsv",sep="\t", header=True)

Alternatively, we can load the trip report from the original sample TSV object in Amazon S3.

In [None]:
feedbackFile = spark.read.csv("s3a://bd-dist/trip_report.tsv",sep="\t", header=True)

__Convert the data to a Pandas data frame__

In [None]:
import re

import pandas as pd
import matplotlib.pyplot as plt

df = feedbackFile.toPandas()

df.head()

# Visualize the data

__Types of trip outcomes by field representative__

In [None]:
import numpy as np
np.random.seed(sum(map(ord, "categorical")))

from matplotlib.colors import ListedColormap
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid", color_codes=True)

outcome_dict = {'Successful':0,'Partial Success':1,'Unsuccessful':2 }

df_vis = df[['Your Name', 'Outcome']]
df_vis['outcome_numeric'] = df_vis['Outcome'].apply(lambda a:outcome_dict[a])



outcome_cross_table = pd.crosstab(index=df_vis["Your Name"], 
                          columns=df_vis["Outcome"])


outcome_cross_table.plot(kind="bar", 
                 figsize=(16,12),
                 stacked=True,fontsize=12)
plt.show();

__Types of outcomes by event type__

In [None]:
event_type_cross_table = pd.crosstab(index=df["Primary Audience Engaged"], 
                          columns=df["Outcome"])

event_type_cross_table.plot(kind="bar", 
                 figsize=(16,12),
                 stacked=True,fontsize=12)
plt.show();

# Now convert "Highlights" data to prepare for training the model

In [None]:
df['Highlights'] = df['Highlights'].astype(str)

df[['Highlights','Outcome']].head(20)

In [None]:
df_outcome = df[['Highlights','Outcome']]

pd.set_option('display.height', 1000)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

grouped_highlights = pd.DataFrame(df_outcome.groupby('Outcome')['Highlights'].apply(lambda x: "%s" % ' '.join(x)))

grouped_highlights['Outcome'] = list(grouped_highlights.index.get_values())
grouped_highlights.reset_index(drop=True, inplace=True)

grouped_highlights['Highlights'] = grouped_highlights['Highlights'].astype(str)

df['Highlights'] = df['Highlights'].apply(lambda a: a.lower())

df_success = df[df['Outcome'] == 'Successful']
df_unsuccess = df[df['Outcome'] == 'Unsuccessful']
df_part_success = df[df['Outcome'] == 'Partial Success']

__Import additional Machine Learning libraries__

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Dense, Embedding, LSTM
from sklearn.model_selection import train_test_split
from keras.utils.np_utils import to_categorical

__Separating train and test data. Taking successful and unsuccessful separately__

In [None]:
df_failure = df_part_success.append(df_unsuccess, ignore_index= True)

df_failure['Outcome'] = 'Unsuccessful'

test_hold_out = 0.1

#### Success

train = df_success[ : -int(test_hold_out * len(df_success))]
test = df_success[-int(test_hold_out * len(df_success)) : ]

#### Failure

train = train.append(df_failure[ : -int(test_hold_out * len(df_failure))])
test = test.append(df_failure[-int(test_hold_out * len(df_failure)) : ])


train = train.sample(frac = 1)
train['type'] = "Train"
test['type'] = "Test"

train = train.append(test)

train.reset_index(drop=True,inplace=True)

Y = pd.get_dummies(train['Outcome']).values

test_index_list = list(train[train['type'] == 'Test'].index)

test_index_list

# Use the HIGHLIGHTS field for sentiment analysis

__max_features__ = Vocabulary size, its a hyper parameter

*Tokenizer creates vectors from text, mainly works like a dictionary id in total vocabulary, returns list of integers, where every integer acts like an index 

In [None]:
max_fatures = 10000
tokenizer = Tokenizer(num_words=max_fatures, split=' ')
tokenizer.fit_on_texts(train['Highlights'].values)
X_highlights = tokenizer.texts_to_sequences(train['Highlights'].values)
X_highlights = pad_sequences(X_highlights)

__Creating the network layer by layer__

First layer is word embedding layer, second layer is LSTM based RNN, and third layer is Softmax activation layer, due to categorical outcome

In [None]:
embed_dim = 128
lstm_out = 196

model = Sequential()
model.add(Embedding(max_fatures, embed_dim,input_length = X_highlights.shape[1], dropout=0.05))
model.add(LSTM(lstm_out, dropout_U=0.1, dropout_W=0.1))
model.add(Dense(2,activation='softmax'))
model.compile(loss = 'categorical_crossentropy', optimizer='adam',metrics = ['accuracy'])
print(model.summary())

__Separating train and test data__

In [None]:
X_highlights_train = X_highlights[0:test_index_list[0]]
Y_highlights_train = Y[0:test_index_list[0]]

X_highlights_test = X_highlights[test_index_list[0]:]
Y_highlights_test = Y[test_index_list[0]:]

__Running the model__

Batch size and number of epoch can be changed as optimisation

In [None]:
batch_size = 20
model.fit(X_highlights_train, Y_highlights_train, epochs = 10, batch_size=batch_size, verbose = 2)

__Printing test data accuracy__

In [None]:
score,accuracy = model.evaluate(X_highlights_test, Y_highlights_test, verbose = 2, batch_size = batch_size)
print("score: %.2f" % (score))
print("accuracy: %.2f" % (accuracy))

# Save the model, tokenizer and feature dimension and store them in Ceph

In [None]:
model.save("./model")

import pickle

with open('./tokenizer.pickle', 'wb') as handle:
    pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)

feature_dimension = X_highlights_train.shape[1]
with open('./feature_dimension.pickle', 'wb') as handle:
    pickle.dump(feature_dimension, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
import boto3
s3 = boto3.resource('s3')

#Create S3 session for writing manifest file
session = boto3.Session(
    aws_access_key_id = os.environ['RGW_USER_USER_KEY'],
    aws_secret_access_key = os.environ['RGW_USER_SECRET_KEY']
)

s3 = session.resource('s3', endpoint_url=os.environ['RGW_API_ENDPOINT'], verify=False)

# Upload the model to S3
s3.meta.client.upload_file('./model', 'ceph-bucket', 'models/trip_report_model')

# Upload the tokenizer to S3
s3.meta.client.upload_file('./tokenizer.pickle', 'ceph-bucket', 'models/trip_report_tokenizer.pickle')

# Upload the feature dimension to S3
s3.meta.client.upload_file('./feature_dimension.pickle', 'ceph-bucket', 'models/trip_report_feature_dimension.pickle')

The model has been saved to s3 as binary objects and can be viewed