# Generating and Saving Data to S3 from PySpark
Author: Imad Ali  
Date: 2021-07-22

In [1]:
import os
import numpy as np
import pandas as pd
import boto3
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

To allow pyspark to write to S3 you'll need to have your credentials setup and will need to download the relevant jars from maven to work with the AWS services.
* hadoop-aws
* aws-java-sdk-bundle

Since we're working within a Python virtual environment these need to download these jars into the `.../site-packages/pyspark/jars`. If you're not using a virtual environment then these jars would need to be downloaded into `~/spark/spark-<version>-bin-hadoop<version>/jars`.

You can go to the [spark core](https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12/3.1.2) page on maven to find the hadoop-aws jar that can be used given the version of spark that you're using. And then you can find aws-java-sdk jar dependency on the hadoop-aws page. In this example the spark version is 3.1.2 so I'm using hadoop-aws-3.2.0.jar and aws-java-sdk-bundle-1.11.375.jar

```
# bash
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar -P ~/.venv/sandbox/lib/python3.8/site-packages/pyspark/jars/

wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.375/aws-java-sdk-bundle-1.11.375.jar -P ~/.venv/sandbox/lib/python3.8/site-packages/pyspark/jars/
```

(This [page](https://www.jitsejan.com/integrating-pyspark-notebook-with-s3) provides detail on how to do this within a Docker container.)

In [2]:
creds = boto3.Session().get_credentials()
sparkConf = SparkConf()
sparkConf.set("spark.hadoop.fs.s3a.access.key", creds.access_key)
sparkConf.set("spark.hadoop.fs.s3a.secret.key", creds.secret_key)
sparkConf.set("spark.hadoop.fs.s3a.path.style.access", True)
sparkConf.set("spark.hadoop.fs.s3a.access.key", creds.access_key)
sparkConf.set("spark.hadoop.fs.s3a.secret.key", creds.secret_key)
sparkConf.set("spark.hadoop.fs.s3a.endpoint", "s3-us-east-2.amazonaws.com")
sparkConf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkConf.set("com.amazonaws.services.s3.enableV4", True)
sparkConf.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

In [3]:
print('Spark version: {0}'.format(spark.version))

Spark version: 3.1.2


In [4]:
def data_generator(size:str) -> pd.DataFrame:
    if size == 'small':
        N = 1_000
    elif size == 'medium':
        N = 100_000
    elif size == 'large':
        N = 1_000_000
    else:
        raise Exception('Size must be one of ["small","medium","large"]')
    data = pd.DataFrame({'v0': np.random.choice(['A','B','C','D','E'], size=N, replace=True),
                         'v1': np.random.binomial(1, 0.5, N),
                         'v2': np.random.normal(0, 1, N)})
    return(data)

In [5]:
data_size = 'large'
data = data_generator(data_size)
print('Data shape: {0}\n'.format(data.shape))
print(data.head())

Data shape: (1000000, 3)

  v0  v1        v2
0  A   0 -0.090462
1  E   0  0.386369
2  A   0 -1.360810
3  A   1  0.893191
4  D   1 -0.636711


In [6]:
spark_data = spark.createDataFrame(data)
print(spark_data.show(5))

+---+---+--------------------+
| v0| v1|                  v2|
+---+---+--------------------+
|  A|  0|-0.09046177847118464|
|  E|  0| 0.38636902758408764|
|  A|  0| -1.3608098979066194|
|  A|  1|  0.8931909633636685|
|  D|  1| -0.6367107539467987|
+---+---+--------------------+
only showing top 5 rows

None


Write to S3

In [7]:
s3_bucket = 'imad-pyspark-test/'
s3_key = 'data/csv/' + data_size + '.csv'
# spark_data.write.csv('s3a://' + s3_bucket + s3_key, mode='overwrite')
data.to_csv('s3a://' + s3_bucket + s3_key, index=False)
s3_key = 'data/parquet/' + data_size
spark_data.write.parquet('s3a://' + s3_bucket + s3_key, mode='overwrite')