#  PySpark II


## Sessão

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=92204e1aba74b4c880e591cd4a16259414534fb25d10de96ff6a924570ee71d0
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
from pyspark.sql import SparkSession
import pyspark as ps
spark = (SparkSession
            .builder
            .appName("Python Spark SQL basic example")
            .config("spark.some.config.option", "some-value")
            .getOrCreate())

In [4]:
conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
conf.set("spark.executor.heartbeatInterval","3600s")

<pyspark.conf.SparkConf at 0x7a5b4b74b580>

## Read csv

In [6]:
df_spark = (spark.read.format("csv")
      .option("header","true")
      .load("Cancer_Data.csv"))
df_spark

DataFrame[id: string, diagnosis: string, radius_mean: string, texture_mean: string, perimeter_mean: string, area_mean: string, smoothness_mean: string, compactness_mean: string, concavity_mean: string, concave points_mean: string, symmetry_mean: string, fractal_dimension_mean: string, radius_se: string, texture_se: string, perimeter_se: string, area_se: string, smoothness_se: string, compactness_se: string, concavity_se: string, concave points_se: string, symmetry_se: string, fractal_dimension_se: string, radius_worst: string, texture_worst: string, perimeter_worst: string, area_worst: string, smoothness_worst: string, compactness_worst: string, concavity_worst: string, concave points_worst: string, symmetry_worst: string, fractal_dimension_worst: string, _c32: string]

In [7]:
df_spark.show(5)

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+--------+---------+-----------+------

## Parquet

In [9]:
df_spark.count()

569

In [10]:
df_spark.write.parquet('cancer_parquet.parquet',
                       mode='overwrite',
                       partitionBy='diagnosis')

In [12]:
df = spark.read.parquet('cancer_parquet.parquet')

df.count()

569

## Operações

In [14]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, lit, when

In [15]:
df_spark.show(2)

+------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|    id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+------+---------+-----------+------------

#### Agrupamento

In [16]:
df_agg = (df_spark
          .groupBy('diagnosis')
          .count())

df_agg.show()

+---------+-----+
|diagnosis|count|
+---------+-----+
|        B|  357|
|        M|  212|
+---------+-----+



In [17]:
df_agg = (df_spark
          .groupBy('diagnosis')
          .agg(F.count('radius_mean').alias('n'),
               F.min('radius_mean').alias('min_radius_mean'),
               F.mean('radius_mean').alias('avg_radius_mean'),
               F.max('radius_mean').alias('max_radius_mean')))

df_agg.show()

+---------+---+---------------+-----------------+---------------+
|diagnosis|  n|min_radius_mean|  avg_radius_mean|max_radius_mean|
+---------+---+---------------+-----------------+---------------+
|        B|357|          10.03|12.14652380952381|          9.904|
|        M|212|          10.95|17.46283018867925|          28.11|
+---------+---+---------------+-----------------+---------------+



In [None]:
df_agg = (df_spark
          .withColumn('bigger',col('area_mean')>1000)
          .groupBy('bigger')
          .pivot('diagnosis')
          .count())

df_agg.show()

+------+----+---+
|bigger|   B|  M|
+------+----+---+
|  true|null| 92|
| false| 357|120|
+------+----+---+



 #### Ordenação

In [None]:
df_spark.show(2)

+------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|    id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+------+---------+-----------+------------

In [22]:
# df_sort = (df_spark.sort('radius_mean'))
df_sort = (df_spark.orderBy('radius_mean', ascending=False))
df_sort.show(3)

+------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|    id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+------+---------+-----------+------------

In [23]:
df_sort = (df_spark.orderBy(F.desc('radius_mean')))
df_sort.show(3)

+------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|    id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+------+---------+-----------+------------

In [None]:
df_sort = (df_spark.orderBy('diagnosis',F.desc('radius_mean')))
df_sort.show(3)

+------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|    id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+------+---------+-----------+------------

### Join

In [24]:
df1 = df_spark.select('id','diagnosis').limit(100)
df2 = df_spark.select('id','radius_mean').limit(50)

In [25]:
print(df1.count())
df1.show(2)

100
+------+---------+
|    id|diagnosis|
+------+---------+
|842302|        M|
|842517|        M|
+------+---------+
only showing top 2 rows



In [26]:
print(df2.count())
df2.show(2)

50
+------+-----------+
|    id|radius_mean|
+------+-----------+
|842302|      17.99|
|842517|      20.57|
+------+-----------+
only showing top 2 rows



In [28]:
df_join = df1.join(df2,'id','inner')

print(df_join.count())
df_join.show(3)

50
+--------+---------+-----------+
|      id|diagnosis|radius_mean|
+--------+---------+-----------+
|  842302|        M|      17.99|
|  842517|        M|      20.57|
|84300903|        M|      19.69|
+--------+---------+-----------+
only showing top 3 rows



In [32]:
df_join = df1.join(df2,'id','outer')

print(df_join.count())
df_join.orderBy('radius_mean', ascending=True).show(3)

100
+------+---------+-----------+
|    id|diagnosis|radius_mean|
+------+---------+-----------+
|857373|        B|       NULL|
|857438|        M|       NULL|
|857374|        B|       NULL|
+------+---------+-----------+
only showing top 3 rows



### Cache

In [35]:
# df_agg = df_spark.orderBy('id')
for i in range(100):
    df_spark.count()

In [36]:
df_spark.cache()
for i in range(100):
    df_spark.count()

In [38]:
df_spark.unpersist()

DataFrame[id: string, diagnosis: string, radius_mean: string, texture_mean: string, perimeter_mean: string, area_mean: string, smoothness_mean: string, compactness_mean: string, concavity_mean: string, concave points_mean: string, symmetry_mean: string, fractal_dimension_mean: string, radius_se: string, texture_se: string, perimeter_se: string, area_se: string, smoothness_se: string, compactness_se: string, concavity_se: string, concave points_se: string, symmetry_se: string, fractal_dimension_se: string, radius_worst: string, texture_worst: string, perimeter_worst: string, area_worst: string, smoothness_worst: string, compactness_worst: string, concavity_worst: string, concave points_worst: string, symmetry_worst: string, fractal_dimension_worst: string, _c32: string]

### Particoes

In [39]:
df_part = df_spark.repartition(10)
df_part.rdd.getNumPartitions()

10

In [None]:
df_spark.rdd.getNumPartitions()

1