In [None]:
!pip install install-jdk
!pip install pyspark --user
!pip install findspark --user

In [None]:
import jdk
jdk.install('11')

In [None]:
import pyspark
import findspark
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"] = "/home/jovyan/.jdk/jdk-11.0.18+10"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.8"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.8"
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('sparktest').setMaster('k8s://https://kubernetes.default.svc:443')
conf.set("spark.kubernetes.namespace", "kubeflow-user-example-com")
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "default-editor")

conf.set("spark.kubernetes.container.image", "lexcc/spark-py")
conf.set("spark.kubernetes.executor.container.image", "lexcc/spark-py")
conf.set("spark.kubernetes.container.image.pullPolicy", "Always")

conf.set("spark.kubernetes.allocation.batch.size", "5")
conf.set("spark.kubernetes.executor.instances", "1")
conf.set("spark.driver.bindAddress", "0.0.0.0")
conf.set("spark.driver.host", "jupyter")
conf.set("spark.driver.port", "37371")
conf.set("spark.blockManager.port", "6060")

conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.2')
ACCESS_KEY = os.environ.get('ACCESS_KEY')
SECRET_KEY = os.environ.get('SECRET_KEY')
S3_ENDPOINT = os.environ.get('S3_ENDPOINT')

In [None]:
spark = SparkSession.builder.config(conf=conf).appName("Word Count").getOrCreate()
sc = spark.sparkContext

In [None]:
def setup_s3_connection(access_key=None, secret_key=None, s3_endpoint=None):
    global ACCESS_KEY, SECRET_KEY, S3_ENDPOINT
    if access_key is not None:
        ACCESS_KEY = access_key
    if secret_key is not None:
        SECRET_KEY = secret_key
    if s3_endpoint is not None:
        S3_ENDPOINT = s3_endpoint
    if ACCESS_KEY is None or SECRET_KEY is None or S3_ENDPOINT is None:
        raise Exception("Missing information, S3 connection can't been set") 

setup_s3_connection("") # Fill your settings

In [None]:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT)
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
# sc._jsc.hadoopConfiguration().set("fs.s3a.buckets.create.enabled", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
# sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem")

In [None]:
import boto3
from botocore.exceptions import ClientError

def connect_to_s3():
    session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    )
    conn = session.resource("s3", endpoint_url=S3_ENDPOINT)
    return conn

def create_bucket(s3, bucket_name):
    s3.create_bucket(Bucket=bucket_name)
    return

def create_bucket_if_not_existed(s3, bucket_name):
    if bucket_exist(s3, bucket_name):
        return
    create_bucket(s3, bucket_name)
    return

def bucket_exist(s3, bucket_name):
    try:
        s3.meta.client.head_bucket(Bucket=bucket_name)
    except ClientError:
        print("The bucket does not exist or you have no access.")
        return False
    return True

def print_file_name_in_bucket(s3, bucket_name):
    my_bucket = s3.Bucket(bucket_name)
    for file in my_bucket.objects.all():
        print(file.key)
    return

def get_file_from_bucket(s3, bucket_name, file_name, content=None):
    try:
        obj = s3.Object(bucket_name, file_name)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("File not found")
            return None
        else:
            return None

    if content is None:
        return obj
    return obj.get()['Body']

def upload_file_to_bucket(s3, bucket_name, file_name, saved_name=None):
    if saved_name is None:
        saved_name = file_name
    s3.meta.client.upload_file(file_name, bucket_name, saved_name)
    return

def delete_bucket(s3, bucket_name, force):
    bucket = s3.Bucket(bucket_name)
    if force:
        bucket.objects.all().delete()
    bucket.delete()
    return

s3 = connect_to_s3()
bucket_name = "test"
file = "train.csv"
create_bucket(s3, bucket_name)
upload_file_to_bucket(s3, bucket_name, file)
print_file_name_in_bucket(s3, bucket_name)
#print(get_file_from_bucket(s3, bucket_name, file, True).read())
delete_bucket(s3, bucket_name, True)

In [None]:
import pandas as pd
df_p = pd.read_csv('./train.csv')
df_p.FireplaceQu = df_p.FireplaceQu.astype(str)
df_p.Fence = df_p.Fence.astype(str)
df_p.MiscFeature = df_p.MiscFeature.astype(str)
df_p.BsmtQual = df_p.BsmtQual.astype(str)
df_p.BsmtCond = df_p.BsmtCond.astype(str)
df_p.BsmtExposure = df_p.BsmtExposure.astype(str)
df_p.BsmtFinType1 = df_p.BsmtFinType1.astype(str)
df_p.BsmtFinType2 = df_p.BsmtFinType2.astype(str)
df_p.Alley = df_p.Alley.astype(str)
df_p.GarageType = df_p.GarageType.astype(str)
df_p.GarageFinish = df_p.GarageFinish.astype(str)
df_p.GarageQual = df_p.GarageQual.astype(str)
df_p.GarageCond = df_p.GarageCond.astype(str)

df_p = df_p.dropna()

bucket_name = "mlpipeline"
folder_name = "result"
create_bucket_if_not_existed(s3, bucket_name)
df = spark.createDataFrame(df_p)
s3_path = 's3a://%s/%s' % (bucket_name, folder_name)
df.write.format("csv").option("header", "true").save(s3_path)

In [None]:
from pyspark.sql.context import SQLContext
sqlContext = SQLContext(spark)
# Read and convert csv into dataframe
df = sqlContext.read.csv(s3_path, header=True)
store_in_RDD = True
s3_path += "_new"

if store_in_RDD:
    # Get RDD object
    df_rdd = df.rdd
    #Save object in RDD format
    df_rdd.saveAsTextFile(s3_path)
else:
    # Save object in Standard format
    df.write.format("csv").option("header", "true").save(s3_path)

# Read and convert csv files into dataframe
test = sqlContext.read.csv(s3_path, header=True)
test.show()