In [21]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

In [3]:
AWS_S3_ENDPOINT = "http://minio:9000"
AWS_BUCKET_NAME = "data-platform-core"
EXTRA_CLASS_PATH_JARS = ",".join(
    os.path.join("/home/jovyan/work/.jars", p) for p in os.listdir("/home/jovyan/work/.jars")
)

In [4]:
spark = SparkSession.builder \
        .appName("Session of avitkovskiy") \
        .master("spark://spark-master:7077") \
        .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
        .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) \
        .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
        .config("spark.hadoop.fs.s3a.endpoint", AWS_S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.spark_catalog.type", "hive") \
        .config("spark.sql.catalog.spark_catalog.uri", "thrift://hive-metastore:9083") \
        .config("spark.sql.catalog.spark_catalog.warehouse", f"s3a://{AWS_BUCKET_NAME}/") \
        .config("spark.sql.warehouse.dir", f"s3a://{AWS_BUCKET_NAME}/") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
        .config("spark.jars", EXTRA_CLASS_PATH_JARS) \
        .enableHiveSupport() \
        .getOrCreate()

In [11]:
df = spark.read.option("header", True).csv("s3a://raw-data/csv/games/fifa/2018_FIFA_WCS.csv")

In [22]:
df.groupBy("Team") \
    .agg(count("*").alias("count")
     ) \
    .show(truncate=False)

+-----------+-----+
|Team       |count|
+-----------+-----+
|Russia     |23   |
|Senegal    |23   |
|Sweden     |23   |
|Germany    |23   |
|France     |23   |
|Argentina  |23   |
|Belgium    |23   |
|Peru       |23   |
|Croatia    |23   |
|Nigeria    |23   |
|Spain      |23   |
|Denmark    |23   |
|Iran       |23   |
|Morocco    |23   |
|Panama     |23   |
|Iceland    |23   |
|South Korea|23   |
|Uruguay    |23   |
|Mexico     |23   |
|Tunisia    |23   |
+-----------+-----+
only showing top 20 rows



In [23]:
df.filter(df.Team == "Russia").show(truncate=False)

+------+-----+------------+--------+-----------------------+-------------+---+----+-----+----------------------+------------+
|Team  |Group|Squad Number|Position|Player                 |Date Of Birth|Age|Caps|Goals|Club                  |Player Count|
+------+-----+------------+--------+-----------------------+-------------+---+----+-----+----------------------+------------+
|Russia|A    |1           |GK      |Igor Akinfeev (captain)|1986/04/08   |32 |105 |0    |CSKA Moscow           |1           |
|Russia|A    |2           |DF      |Mário Fernandes        |1990/09/19   |27 |4   |0    |CSKA Moscow           |1           |
|Russia|A    |3           |DF      |Ilya Kutepov           |1993/07/29   |24 |6   |0    |Spartak Moscow        |1           |
|Russia|A    |4           |DF      |Sergei Ignashevich     |1979/07/14   |38 |121 |8    |CSKA Moscow           |1           |
|Russia|A    |5           |DF      |Andrei Semyonov        |1989/03/24   |29 |6   |0    |Akhmat Grozny         |1     