# Open Data Hub and Object Storage

The intent of this notebook is to provide examples of how data engineers/scientist can use Open Data Hub and object storage, specifically, Ceph object storage, much in the same way they are accoustomed to interacting with Amazon Simple Storage Service (S3). This is made possible because Ceph's object storage gateway offers excellent fidelity with the modalities of Amazon S3.

# Working with Boto

Boto is an integrated interface to current and future infrastructural services offered by Amazon Web Services. Amoung the services it provides interfaces for is Amazon S3. For lightweight analysis of data using python tools like numpy or pandas, it is handy to interact with data stored in object storage using pure python. This is where Boto shines. 

In [None]:
import sys

In [None]:
import os
import boto3


s3_endpoint_url = os.environ['S3_ENDPOINT_URL']
s3_access_key = os.environ['AWS_ACCESS_KEY_ID']
s3_secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
s3_bucket_name = os.environ['JUPYTERHUB_USER']

print(s3_endpoint_url)
print(s3_bucket_name)
s3 = boto3.client('s3','us-east-1', endpoint_url= s3_endpoint_url,
                       aws_access_key_id = s3_access_key,
                       aws_secret_access_key = s3_secret_key)


# Creating a bucket, uploading and object (put), and listing the bucket.

In [None]:
s3.create_bucket(Bucket=s3_bucket_name)
s3.put_object(Bucket=s3_bucket_name,Key='object',Body='data')
for key in s3.list_objects(Bucket=s3_bucket_name)['Contents']:
    print(key['Key'])

# Working with Spark

Open Data Hub operator will install Spark. Each Jupyterhub user will also have a dedicated Spark cluster (Master and Workers) to use. First step is to connect to the Spark Cluster and get a spark session

In [None]:
import os
import pyspark

from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext

##################################################
# This is a HACK until we can resolve the issue
# preventing the spark nodes from resolving pod
# names
spark_driver_host = "10.128.2.70"
##################################################

#os.environ["PYSPARK_SUBMIT_ARGS"] = f"--conf spark.driver.host={spark_driver_host} --conf spark.cores.max=6 --conf spark.executor.instances=2 --conf spark.executor.memory=3G --conf spark.executor.cores=3 --conf spark.driver.memory=4G --packages com.amazonaws:aws-java-sdk:1.8.0,org.apache.hadoop:hadoop-aws:2.8.5 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = f"--conf spark.driver.host={spark_driver_host} --packages com.amazonaws:aws-java-sdk:1.8.0,org.apache.hadoop:hadoop-aws:2.8.5 pyspark-shell"
spark_cluster_url = f"spark://{os.environ['SPARK_CLUSTER']}:7077"
spark = SparkSession.builder.master(spark_cluster_url).getOrCreate()

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.endpoint", s3_endpoint_url)
hadoopConf.set("fs.s3a.access.key", s3_access_key)
hadoopConf.set("fs.s3a.secret.key", s3_secret_key)
hadoopConf.set("fs.s3a.path.style.access", "true")
hadoopConf.set("fs.s3a.connection.ssl.enabled", "false")

In [None]:
import socket
spark.range(100, numPartitions=100).rdd.map(lambda x: socket.gethostname()).distinct().collect()

In [None]:
df0 = spark.read.text(f"s3a://{s3_bucket_name}/object")

In [None]:
df0
print("Total number of rows in df0: %d" % df0.count())
df0.printSchema()
df0.show()
df0.select("value").show()

# Working with a Hybrid Data Context

As of Hadoop 2.8, S3A supports per bucket configuration. This is very powerful. It allows us to have a distinct S3A configuration, with a different endpoint and different set of credentials. With this I can use a single Spark context to read a parquet file from a bucket in the public cloud (Amazon S3) into a data frame, then turn around and write that dataframe as a parquet file into a bucket that exists in the Ceph(Rook) local cluster installation.

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.bucket.bd-dist.endpoint", "s3.amazonaws.com")
hadoopConf.set("fs.s3a.bucket.bd-dist.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

__Public to Private ETL__

Simply read tab separated data from a bucket in Amazon S3 and write it back out to a bucket in our Ceph(Rook) service.

In [None]:
tripreport = spark.read.csv("s3a://bd-dist/trip_report.tsv",sep="\t")
print("Total number of rows in df0: %d" % tripreport.count())
tripreport.printSchema()
tripreport.show()

In [None]:
spark.read.csv("s3a://bd-dist/trip_report.tsv",sep="\t").write.csv(f"s3a://{s3_bucket_name}/trip_report.tsv",sep="\t",mode="overwrite")

Extract all JSON files from a bucket prefix (pseudo directory) in Amazon S3 and write them back out to a bucket in our Ceph(Rook) service with the same bucket prefix.

In [None]:
jsonFile= spark.read.option("multiline", True).option("mode", "PERMISSIVE").json("s3a://bd-dist/kube-metrics")
print("Total number of rows in df0: %d" % jsonFile.count())
jsonFile.printSchema()
jsonFile.show()

# Working with SparkSQL

Load Prometheus data set from Ceph(Rook) into a data frame.

__Import statistics libraries__

In [None]:
import pandas as pd
import json
import numpy as np
import seaborn as sns
import sys
import matplotlib.pyplot as plt
%matplotlib inline

from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

__Display schema of files__

In [None]:
print('Display schema:')
jsonFile.printSchema()

__Query the JSON data using filters__

In [None]:
#Register the created SchemaRDD as a temporary table.
jsonFile.registerTempTable("kubelet_docker_operations_latency_microseconds")

#Filter the results into a data frame
data = spark.sql("SELECT values, metric.operation_type FROM kubelet_docker_operations_latency_microseconds WHERE metric.quantile='0.9' AND metric.hostname='free-stg-master-03fb6'")

data.show()
data.printSchema()

In [None]:
data_pd = data.toPandas()



OP_TYPE = 'list_images'

df2 = pd.DataFrame(columns = ['utc_timestamp','value', 'operation_type'])
#df2 ='
for op in set(data_pd['operation_type']):
    dict_raw = data_pd[data_pd['operation_type'] == op]['values']
    list_raw = []
    for key in dict_raw.keys():
        list_raw.extend(dict_raw[key])
    temp_frame = pd.DataFrame(list_raw, columns = ['utc_timestamp','value'])
    temp_frame['operation_type'] = op
    
    df2 = df2.append(temp_frame)


df2 = df2[df2['value'] != 'NaN']

df2['value'] = df2['value'].apply(lambda a: int(a))

df2['timestamp'] = df2['utc_timestamp'].apply(lambda a : datetime.fromtimestamp(int(a)))

df2.head()

__ Objective - Verify Above Alerts __

Store timestamp with data

In [None]:
df2.reset_index(inplace =True)

del df2['index']

df2['operation_type'].unique()
df2.head()

# Save Dataframe in local Ceph(Rook)


In [None]:
# Convert Pandas DataFrame to Spark DataFrame
dfSpark = spark.createDataFrame(df2)
dfSpark.printSchema()

In [None]:
dfSpark.write.csv(f"s3a://{s3_bucket_name}//kube-metrics/operationinfo.csv",sep="\t",mode="overwrite",header = 'True')

This is just for us testing

In [None]:
testdf = spark.read.csv(f"s3a://{s3_bucket_name}//kube-metrics/operationinfo.csv",sep="\t")
testdf.printSchema()
testdf.show()

In [None]:
spark.stop()