In [1]:
from pyspark import SparkConf
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import *
from pyspark.sql.functions import explode, split, col, sum, lit
from pyspark.sql import SparkSession

In [6]:
def getS3Session (access_key='lakefs', secret_key='lakefs', ceph_host='endpoint', ceph_port=443):
    spark = SparkSession.builder \
        .appName("app_name") \
        .getOrCreate()
    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
    #spark._jsc.hadoopConfiguration().set("fs.s3a.session.token", "")
    spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    #spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
    spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", ceph_host)
    spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    return spark

In [7]:
custom_schema = StructType([
                StructField("bs", StringType(), True),
                StructField("iodepth", StringType(), True),
                StructField("bw_mean", DoubleType(), True),
                StructField("bw_min", DoubleType(), True),
                StructField("bw_max", DoubleType(), True),
                StructField("bw_dev", DoubleType(), True),
                StructField("iops_mean", DoubleType(), True),
                StructField("iops_min", IntegerType(), True),
                StructField("iops_max", IntegerType(), True),
                StructField("iops_stddev", DoubleType(), True)])
spark = getS3Session(ceph_host='some_server')
df = spark.read.csv('s3a://lakefs/moderatelyBig.csv', header=True, schema=custom_schema)
df.show()

+------------+-------------+------------------+------------------+------------------+------------------+---------+--------+--------+------------------+
|          bs|      iodepth|           bw_mean|            bw_min|            bw_max|            bw_dev|iops_mean|iops_min|iops_max|       iops_stddev|
+------------+-------------+------------------+------------------+------------------+------------------+---------+--------+--------+------------------+
|0xf6c762505c| 195035318375| 589604.2258611814| 82537.06629591854|  66310.9234002498| 945.0269019000774| 310202.0|  584624|   62881| 315.7031103618132|
|0xc44640f5e9| 783525496224|109346.70062874281| 560608.3284556706| 730399.9118590021| 615.3322330775512| 291703.0|  634439|   63721|  684.768916778562|
|0x75f87c68e5| 661659213340|  650226.047027243|1014028.1056788351|  792205.491263445| 864.5815486096335| 373873.0|  212455|  646345|  693.526209532466|
|0x867791db4e| 697369883188|454500.33719573624| 176212.1333213721| 599515.7229642089|  8

In [8]:
%%bash
s3cmd -s --access_key=lakefs --secret_key=lakefs --host=some_server:443 --no-check-certificate ls

2022-03-10 11:47  s3://lakefs
