In [1]:
import pyspark,os,socket

In [2]:
k8s_master = 'k8s://https://0B5513313A4AE2DBB9C7BC6E03F940B9.gr7.us-east-2.eks.amazonaws.com'
exampleJars = [
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.0",
    "org.apache.hadoop:hadoop-aws:3.3.4",
    "com.amazonaws:aws-java-sdk-bundle:1.12.262"
]

In [3]:
props = {
    "warehouse_path": "s3a://granica-demo-spark-warehouse",
    "catalog_name": "demo",
    "catalog_type": "hadoop",
    "iceberg_spark_jar": "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.0",
    "table_name": "logs",
    "input_path": "really_large_access.log",
    "mode": "local",
    "output_dir": "output"
}

In [4]:
builder = (
    pyspark.sql.SparkSession.builder.appName("Myapp")
    # Sets the Spark master/captain URL to connect too.
    .master(k8s_master)
    # JARs on disk to load into our Spark session
    .config("spark.jars.packages", ",".join(exampleJars))
    .config("spark.kubernetes.container.image", "public.ecr.aws/b1m7t7i1/pyspark-granica-demo:v1")
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
    .config("spark.kubernetes.authenticate.serviceAccountName", "spark")
    .config("spark.executor.instances", "1")
    .config("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
    # k8s service for Jupyter driver
    .config("spark.driver.host", socket.gethostbyname(socket.gethostname()))
    # Port for Jupyter driver
    .config("spark.driver.port", 2222)
    # Extending the capabilities of SQL searching with Iceberg tables
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config(f"spark.sql.catalog.{props['catalog_name']}", "org.apache.iceberg.spark.SparkCatalog")
    .config(f"spark.sql.catalog.{props['catalog_name']}.warehouse", props['warehouse_path'])
    .config(f"spark.sql.catalog.{props['catalog_name']}.type", props['catalog_type'])
    .config("spark.sql.warehouse.dir", props['warehouse_path'])
    # .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # .config("spark.hadoop.fs.s3a.aws.credentials.provider", 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
    ####### AWS setup and creds #######
    # .config("spark.hadoop.fs.s3a.access.key", "")
    # .config("spark.hadoop.fs.s3a.secret.key", "analyst123")
    .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.attempts.maximum", "1")
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "5000")
    .config("spark.hadoop.fs.s3a.connection.timeout", "10000")
)
spark = builder.getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a12ed83b-0edb-4989-a639-e8f17b322b54;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.0 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
downloading https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.7.0/iceberg-spark-runtime-3.5_2.12-1.7.0.jar ...
	[SUCCESSFUL ] org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.7.0!iceberg-spark-runtime-3.5_2.12.jar (1045ms)
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-

In [5]:
spark

24/12/03 05:51:37 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [7]:
db_name=props['catalog_name']
table_name=props['table_name']
top5_ips_query = f"""with day_ip_cnt as (
    select date,ip,count(1) as hits
    from {db_name}.{table_name}
    group by date,ip
),
ranked_day_ip_cnt as (
    select date,ip,hits,
    row_number() over (partition by date order by hits desc) as rn
    from day_ip_cnt
)
select date,ip,hits from ranked_day_ip_cnt
where rn<=5
order by date desc,hits desc"""

In [8]:
df = spark.sql(top5_ips_query)

In [9]:
df.show(10)

                                                                                

+--------+--------------+----+
|    date|            ip|hits|
+--------+--------------+----+
|20211231| 63.143.42.249| 147|
|20211231| 63.143.42.247| 147|
|20211231|    77.88.5.51|  82|
|20211231|213.180.203.97|  76|
|20211231| 5.164.166.184|  75|
|20211230| 63.143.42.249| 288|
|20211230| 63.143.42.247| 288|
|20211230|    77.88.5.51| 120|
|20211230|  91.219.193.2|  88|
|20211230|83.220.239.204|  84|
+--------+--------------+----+
only showing top 10 rows

