# Prueba de conexión de Jupyter a Spark-Iceberg (iris)

## 1. Comprobaciones previas

- Versiones de los productos

In [2]:
!python --version

Python 3.8.13


In [4]:
!java -version

openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-8u372-ga~us1-0ubuntu1~22.04-b09)
OpenJDK 64-Bit Server VM (build 25.362-b09, mixed mode)


In [6]:
!find / -iname spark*.jar 2>/dev/null|grep sql

/usr/local/spark-3.3.1-bin-hadoop3/jars/spark-sql_2.12-3.3.1.jar


- Variables de entorno

In [8]:
import os
os.environ["AWS_REGION"]
#os.environ["AWS_ACCESS_KEY_ID"]

'us-east-1'

## 2. Pre-requisitos

In [10]:
!pip install pyspark==3.3.1

Collecting pyspark==3.3.1
  Using cached pyspark-3.3.1-py2.py3-none-any.whl
Collecting py4j==0.10.9.5
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


## 3. Libs y configuración

In [11]:
import os
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, BooleanType
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark import SQLContext

#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.1.0,org.mariadb.jdbc:mariadb-java-client:2.3.0,org.postgresql:postgresql:42.5.4 pyspark-shell'
dst_lh_url  = os.environ.get('DST_LH_URL', 'spark://spark-master-0.spark-headless.demml.svc.cluster.local:7077')
dst_lh_appn = os.environ.get('DST_LH_APPN', 'TestDesdeJupyter20230530g')
import subprocess
my_pod_ip = subprocess.run(['hostname', '-I'], stdout=subprocess.PIPE).stdout.decode('utf-8').strip(' \n\t')
my_pod_ip


'172.21.174.69'

## 4. Conexión al spark

Primera prueba:
- v 3.3_2.12
- Incluyendo libs bases (por pbs con scala, amazon, ...)
- Con io-impl a S3
- Con env-vars de AWS (inyectadas via helm)
- Con provider "simple" para autenticarse en S3 minio

In [12]:
spark2 = SparkSession.builder \
    .master(dst_lh_url) \
    .appName(dst_lh_appn) \
    .config('spark.jars.packages','org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0,org.mariadb.jdbc:mariadb-java-client:2.3.0,org.postgresql:postgresql:42.5.4,software.amazon.awssdk:bundle:2.17.257,software.amazon.awssdk:url-connection-client:2.17.257,software.amazon.awssdk:s3:2.17.257,software.amazon.awssdk:iam:2.17.257,org.apache.hadoop:hadoop-aws:3.2.3') \
    .config('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .config('spark.sql.catalog.spark_catalog','org.apache.iceberg.spark.SparkSessionCatalog') \
    .config('spark.sql.catalog.spark_catalog.catalog-impl','org.apache.iceberg.jdbc.JdbcCatalog') \
    .config('spark.sql.catalog.spark_catalog.uri','jdbc:postgresql://postgresql:5432/iceberg') \
    .config('spark.driver.host', my_pod_ip) \
    .config('spark.sql.catalog.spark_catalog.jdbc.useSSL','false') \
    .config('spark.sql.catalog.spark_catalog.jdbc.user','iceberg') \
    .config('spark.sql.catalog.spark_catalog.jdbc.password','iceberg') \
    .config('spark.sql.catalog.spark_catalog.warehouse','s3a://warehouse/') \
    .config('spark.sql.catalog.spark_catalog.s3.endpoint','http://minio:9000') \
    .config('spark.sql.catalog.spark_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.defaultCatalog','spark_catalog') \
    .config('spark.hadoop.fs.s3a.endpoint','http://minio:9000') \
    .config('spark.hadoop.fs.s3a.access.key','admin') \
    .config('spark.hadoop.fs.s3a.secret.key','t4bl4red0nd4') \
    .config('spark.hadoop.fs.s3a.path.style.access','true') \
    .config('spark.hadoop.fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()
spark2

## 5. Comprobaciones tras conexión

In [13]:
spark2.sql("show current namespace").show() 

+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|  default|
+-------------+---------+



In [14]:
spark2.sql("show databases").show() 

+---------+
|namespace|
+---------+
|  default|
+---------+



In [15]:
spark2.sql("show tables").show() 

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



## 6. Creación de tablas

La siguiente instrucción... si ya has creado la tabla previamente... pues no hace falta (fallará)

In [9]:
spark2.sql("CREATE TABLE default.IRIS7 (sepal_length double, sepal_width double, petal_length double, petal_width double, variety string) USING iceberg OPTIONS ('write.object-storage.enabled'=true, 'write.data.path'='s3://warehouse/');").show()

AnalysisException: Table default.IRIS7 already exists

In [7]:
spark2.sql("SELECT * FROM default.IRIS7").show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
+------------+-----------+------------+-----------+-------+



## 6. Cargo datos para meter dentro del iceberg

In [8]:
import sklearn
from sklearn.datasets import load_iris
iris = sklearn.datasets.load_iris()
iris

{'data': array([[5.1, 3.5, 1.4, 0.2],
        [4.9, 3. , 1.4, 0.2],
        [4.7, 3.2, 1.3, 0.2],
        [4.6, 3.1, 1.5, 0.2],
        [5. , 3.6, 1.4, 0.2],
        [5.4, 3.9, 1.7, 0.4],
        [4.6, 3.4, 1.4, 0.3],
        [5. , 3.4, 1.5, 0.2],
        [4.4, 2.9, 1.4, 0.2],
        [4.9, 3.1, 1.5, 0.1],
        [5.4, 3.7, 1.5, 0.2],
        [4.8, 3.4, 1.6, 0.2],
        [4.8, 3. , 1.4, 0.1],
        [4.3, 3. , 1.1, 0.1],
        [5.8, 4. , 1.2, 0.2],
        [5.7, 4.4, 1.5, 0.4],
        [5.4, 3.9, 1.3, 0.4],
        [5.1, 3.5, 1.4, 0.3],
        [5.7, 3.8, 1.7, 0.3],
        [5.1, 3.8, 1.5, 0.3],
        [5.4, 3.4, 1.7, 0.2],
        [5.1, 3.7, 1.5, 0.4],
        [4.6, 3.6, 1. , 0.2],
        [5.1, 3.3, 1.7, 0.5],
        [4.8, 3.4, 1.9, 0.2],
        [5. , 3. , 1.6, 0.2],
        [5. , 3.4, 1.6, 0.4],
        [5.2, 3.5, 1.5, 0.2],
        [5.2, 3.4, 1.4, 0.2],
        [4.7, 3.2, 1.6, 0.2],
        [4.8, 3.1, 1.6, 0.2],
        [5.4, 3.4, 1.5, 0.4],
        [5.2, 4.1, 1.5, 0.1],
  

## 6. Carga de datos en iceberg

In [9]:

spark2.sql("SELECT * FROM default.IRIS7").show()
spark2.sql("INSERT INTO default.IRIS7 VALUES (1.0, 1.0, 1.0, 1.0, 'Setosa') ")
spark2.sql("SELECT * FROM default.IRIS7").show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
+------------+-----------+------------+-----------+-------+

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|variety|
+------------+-----------+------------+-----------+-------+
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
|         1.0|        1.0|         1.0|        1.0| Setosa|
+------------+-----------+------------+

In [10]:
schema2 = StructType([
  StructField("sepal_length", DoubleType(), True),
  StructField("sepal_width", DoubleType(), True),
  StructField("petal_length", DoubleType(), True),
  StructField("petal_width", DoubleType(), True),
  StructField("class", StringType(), True)
])

data2 = ([(1.0, 1.0,1.0,1.0, "Setosa" )])

df2 = spark2.createDataFrame(data2, schema=schema2)

In [11]:
df2.head()

Row(sepal_length=1.0, sepal_width=1.0, petal_length=1.0, petal_width=1.0, class='Setosa')

## Carga directa desde S3

In [16]:
s3_df=spark2.read.parquet("s3a://iris/iris.parq",header=True,inferSchema=True)
s3_df.show(5)

+------------+-----------+------------+-----------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|      class| date_test|
+------------+-----------+------------+-----------+-----------+----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|2020-01-01|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|2020-01-02|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|2020-01-03|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|2020-01-04|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|2020-01-05|
+------------+-----------+------------+-----------+-----------+----------+
only showing top 5 rows

