In [1]:
import sys, os
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, Catalog
from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import Row
from subprocess import check_output

SPARK_DRIVER_HOST = check_output(["hostname", "-i"]).decode(encoding="utf-8").strip()
spark_conf = SparkConf()
spark_conf.setAll([
    ('spark.master', 'spark://spark:7077'),
    ('spark.app.name', 'myApp'),
    ('spark.submit.deployMode', 'client'),
    ('spark.ui.showConsoleProgress', 'true'),
    ('spark.eventLog.enabled', 'false'),
    ('spark.logConf', 'false'),
    ('spark.driver.bindAddress', '0.0.0.0'),
    ('spark.driver.host', SPARK_DRIVER_HOST),
    ('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.704,org.apache.spark:spark-hadoop-cloud_2.12:3.3.0'),
    ("spark.hadoop.fs.s3a.endpoint", 'http://minio:9000'),
    ('spark.hadoop.fs.s3a.access.key', 'minio-root-user'),
    ('spark.hadoop.fs.s3a.secret.key', 'minio-root-password'),
    ('spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled', True),
    ("spark.hadoop.fs.s3a.fast.upload", True),
    ("spark.hadoop.fs.s3a.path.style.access", True),
    ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
])
 
spark_sess          = SparkSession.builder.config(conf=spark_conf).getOrCreate()
spark_ctxt          = spark_sess.sparkContext
spark_reader        = spark_sess.read
spark_streamReader  = spark_sess.readStream
spark_ctxt.setLogLevel("WARN")

citiesDF = spark_sess.read.option("header",True).csv('s3a://cities/cities.csv')

citiesDF.show(truncate=False)

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.spark#spark-hadoop-cloud_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-264db003-3a62-49ec-ae67-c55fe38f414b;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.704 in central
	found org.apache.spark#spark-hadoop-cloud_2.12;3.3.0 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bund

22/10/03 21:34:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/03 21:35:00 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

+-----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
|LatD | "LatM"| "LatS"| "NS"| "LonD"| "LonM"| "LonS"| "EW"| "City"           | "State"|
+-----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
|   41|    5  |   59  | "N" |     80|   39  |    0  | "W" | "Youngstown"     | OH     |
|   42|   52  |   48  | "N" |     97|   23  |   23  | "W" | "Yankton"        | SD     |
|   46|   35  |   59  | "N" |    120|   30  |   36  | "W" | "Yakima"         | WA     |
|   42|   16  |   12  | "N" |     71|   48  |    0  | "W" | "Worcester"      | MA     |
|   43|   37  |   48  | "N" |     89|   46  |   11  | "W" | "Wisconsin Dells"| WI     |
|   36|    5  |   59  | "N" |     80|   15  |    0  | "W" | "Winston-Salem"  | NC     |
|   49|   52  |   48  | "N" |     97|    9  |    0  | "W" | "Winnipeg"       | MB     |
|   39|   11  |   23  | "N" |     78|    9  |   36  | "W" | "Winchester"     | VA     |
|   34|   14  |   24  | "N" |   

In [2]:
citiesDF.columns

['LatD',
 ' "LatM"',
 ' "LatS"',
 ' "NS"',
 ' "LonD"',
 ' "LonM"',
 ' "LonS"',
 ' "EW"',
 ' "City"',
 ' "State"']

In [7]:
from pyspark.sql.functions import col, column
cleanCitiesDF = citiesDF \
   .withColumnRenamed(' "LatM"', 'LatM') \
   .withColumnRenamed(' "LonM"', 'LonM') \
   .select(col('LatM'), col('LonM'))
cleanCitiesDF

DataFrame[LatM: string, LonM: string]

In [13]:
manipulatedFrame = citiesDF \
   .withColumnRenamed(' "LatM"', 'LatM') \
   .withColumnRenamed(' "LonM"', 'LonM')

manipulatedFrame.filter('LatM < 10').show()

+-----+-----+-------+-----+-------+-----+-------+-----+-----------------+--------+
| LatD| LatM| "LatS"| "NS"| "LonD"| LonM| "LonS"| "EW"|           "City"| "State"|
+-----+-----+-------+-----+-------+-----+-------+-----+-----------------+--------+
|   41|    5|     59|  "N"|     80|   39|      0|  "W"|     "Youngstown"|      OH|
|   36|    5|     59|  "N"|     80|   15|      0|  "W"|  "Winston-Salem"|      NC|
|   48|    9|      0|  "N"|    103|   37|     12|  "W"|      "Williston"|      ND|
|   40|    4|     11|  "N"|     80|   43|     12|  "W"|       "Wheeling"|      WV|
|   46|    4|     11|  "N"|    118|   19|     48|  "W"|    "Walla Walla"|      WA|
|   43|    6|     36|  "N"|     75|   13|     48|  "W"|          "Utica"|      NY|
|   36|    9|     35|  "N"|     95|   54|     36|  "W"|          "Tulsa"|      OK|
|   39|    2|     59|  "N"|     95|   40|     11|  "W"|         "Topeka"|      KS|
|   43|    2|     59|  "N"|     76|    9|      0|  "W"|       "Syracuse"|      NY|
|   

In [15]:
manipulatedFrame.head()

Row(LatD='   41', LatM='    5',  "LatS"='   59',  "NS"=' "N"',  "LonD"='     80', LonM='   39',  "LonS"='    0',  "EW"=' "W"',  "City"=' "Youngstown"',  "State"=' OH')

In [4]:
cleanCitiesDF.show()

+-----+-----+
| LatM| LonM|
+-----+-----+
|    5|   39|
|   52|   23|
|   35|   30|
|   16|   48|
|   37|   46|
|    5|   15|
|   52|    9|
|   11|    9|
|   14|   55|
|   45|   33|
|    9|   37|
|   15|    0|
|   40|   16|
|   54|   29|
|   41|   20|
|    4|   43|
|   43|    3|
|   25|   19|
|   25|   23|
|   13|   20|
+-----+-----+
only showing top 20 rows



In [5]:
cleanCitiesDF.write.format("csv").option("header",  True).save("s3a://cities/cleanCities4.csv")

                                                                                

In [6]:
citiesDFRecovered = spark_sess.read.option("header",True).csv('s3a://cities/cleanCities4.csv')
citiesDFRecovered.show()

+----+----+
|LatM|LonM|
+----+----+
|   5|  39|
|  52|  23|
|  35|  30|
|  16|  48|
|  37|  46|
|   5|  15|
|  52|   9|
|  11|   9|
|  14|  55|
|  45|  33|
|   9|  37|
|  15|   0|
|  40|  16|
|  54|  29|
|  41|  20|
|   4|  43|
|  43|   3|
|  25|  19|
|  25|  23|
|  13|  20|
+----+----+
only showing top 20 rows



In [18]:
Team = Row("name", "city", "stadium")
afcNorth = [Team("Bengals", "Cincinnati", "Paul Brown Stadium"),
            Team("Steelers", "Pittsburgh", "Heinz Field"),
            Team("Browns", "Cleveland", "FirstEnergy Field"),
            Team("Ravens", "Baltimore", "M&T Bank Stadium")]
afcNorthDataFrame = spark_sess.createDataFrame(afcNorth)
afcNorthDataFrame.show()

[Stage 8:>                                                          (0 + 1) / 1]

+--------+----------+------------------+
|    name|      city|           stadium|
+--------+----------+------------------+
| Bengals|Cincinnati|Paul Brown Stadium|
|Steelers|Pittsburgh|       Heinz Field|
|  Browns| Cleveland| FirstEnergy Field|
|  Ravens| Baltimore|  M&T Bank Stadium|
+--------+----------+------------------+



                                                                                

In [29]:
rdd = spark_ctxt.parallelize([(1, "One"), (30, "Thirty")])
dfFromParallelize = rdd.toDF()
dfFromParallelize.show()

+---+------+
| _1|    _2|
+---+------+
|  1|   One|
| 30|Thirty|
+---+------+



In [33]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

schema = StructType([ \
    StructField("number",IntegerType(),True), \
    StructField("number_spelled",StringType(),True) \
  ])

In [36]:
renewedDF = spark_sess.createDataFrame(rdd, schema)
renewedDF.show()

+------+--------------+
|number|number_spelled|
+------+--------------+
|     1|           One|
|    30|        Thirty|
+------+--------------+



In [2]:
Song = Row("name", "artist", "well_known_lyrics")
songs = [Song("Mary had a little lamb", "Unknown", "Mary had a little little lamb it's fleece was white as snow"),
            Song("Somebody's watching me", "Rockwell", "I always fear about it somebody watching me"),
            Song("Stayin' Alive", "Bee Gees", "Well, you can tell by the way I use my walkI'm a woman's man, no time to talk")]
songsDF = spark_sess.createDataFrame(songs)
songsDF.show()

+--------------------+--------+--------------------+
|                name|  artist|   well_known_lyrics|
+--------------------+--------+--------------------+
|Mary had a little...| Unknown|Mary had a little...|
|Somebody's watchi...|Rockwell|I always fear abo...|
|       Stayin' Alive|Bee Gees|Well, you can tel...|
+--------------------+--------+--------------------+



                                                                                

In [22]:
songsDF.toPandas()

                                                                                

Unnamed: 0,name,artist,well_known_lyrics
0,Mary had a little lamb,Unknown,Mary had a little little lamb it's fleece was ...
1,Somebody's watching me,Rockwell,I always fear about it somebody watching me
2,Stayin' Alive,Bee Gees,"Well, you can tell by the way I use my walkI'm..."


In [4]:
songsDF.withColumn('wellknownlyriccount', F.size(F.split(F.col('well_known_lyrics'), ' '))).show()

+--------------------+--------+--------------------+-------------------+
|                name|  artist|   well_known_lyrics|wellknownlyriccount|
+--------------------+--------+--------------------+-------------------+
|Mary had a little...| Unknown|Mary had a little...|                 12|
|Somebody's watchi...|Rockwell|I always fear abo...|                  8|
|       Stayin' Alive|Bee Gees|Well, you can tel...|                 18|
+--------------------+--------+--------------------+-------------------+



In [38]:
psdf = spark_ctxt.parallelize([1,3,10,103,400,900,10000,100009,30000,50000])
psdf.getNumPartitions()
psdf2 = psdf.repartition(5)
psdf.getNumPartitions()
psdf2.getNumPartitions()
psdf2.glom().collect()

                                                                                

[[], [1, 3, 10, 103, 400], [], [900, 10000, 100009, 30000, 50000], []]

In [43]:
citiesDF.show()

+-----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
| LatD| "LatM"| "LatS"| "NS"| "LonD"| "LonM"| "LonS"| "EW"|            "City"| "State"|
+-----+-------+-------+-----+-------+-------+-------+-----+------------------+--------+
|   41|      5|     59|  "N"|     80|     39|      0|  "W"|      "Youngstown"|      OH|
|   42|     52|     48|  "N"|     97|     23|     23|  "W"|         "Yankton"|      SD|
|   46|     35|     59|  "N"|    120|     30|     36|  "W"|          "Yakima"|      WA|
|   42|     16|     12|  "N"|     71|     48|      0|  "W"|       "Worcester"|      MA|
|   43|     37|     48|  "N"|     89|     46|     11|  "W"| "Wisconsin Dells"|      WI|
|   36|      5|     59|  "N"|     80|     15|      0|  "W"|   "Winston-Salem"|      NC|
|   49|     52|     48|  "N"|     97|      9|      0|  "W"|        "Winnipeg"|      MB|
|   39|     11|     23|  "N"|     78|      9|     36|  "W"|      "Winchester"|      VA|
|   34|     14|     24|  "N"|   

In [45]:
citiesDF.columns

['LatD',
 ' "LatM"',
 ' "LatS"',
 ' "NS"',
 ' "LonD"',
 ' "LonM"',
 ' "LonS"',
 ' "EW"',
 ' "City"',
 ' "State"']

In [49]:
cleanedCitiesDF = citiesDF \
   .withColumnRenamed(' "LatM"', 'LatM') \
   .withColumnRenamed(' "LonM"', 'LonM') \
   .withColumnRenamed(' "State"', 'State')

cleanedCitiesDF.rdd.glom().collect()

[[Row(LatD='   41', LatM='    5',  "LatS"='   59',  "NS"=' "N"',  "LonD"='     80', LonM='   39',  "LonS"='    0',  "EW"=' "W"',  "City"=' "Youngstown"', State=' OH'),
  Row(LatD='   42', LatM='   52',  "LatS"='   48',  "NS"=' "N"',  "LonD"='     97', LonM='   23',  "LonS"='   23',  "EW"=' "W"',  "City"=' "Yankton"', State=' SD'),
  Row(LatD='   46', LatM='   35',  "LatS"='   59',  "NS"=' "N"',  "LonD"='    120', LonM='   30',  "LonS"='   36',  "EW"=' "W"',  "City"=' "Yakima"', State=' WA'),
  Row(LatD='   42', LatM='   16',  "LatS"='   12',  "NS"=' "N"',  "LonD"='     71', LonM='   48',  "LonS"='    0',  "EW"=' "W"',  "City"=' "Worcester"', State=' MA'),
  Row(LatD='   43', LatM='   37',  "LatS"='   48',  "NS"=' "N"',  "LonD"='     89', LonM='   46',  "LonS"='   11',  "EW"=' "W"',  "City"=' "Wisconsin Dells"', State=' WI'),
  Row(LatD='   36', LatM='    5',  "LatS"='   59',  "NS"=' "N"',  "LonD"='     80', LonM='   15',  "LonS"='    0',  "EW"=' "W"',  "City"=' "Winston-Salem"', State=

In [53]:
partitionedRange = cleanedCitiesDF.repartitionByRange(10, "State")
partitionedRange.rdd.getNumPartitions()

10

In [57]:

partitionedRange.rdd.map(lambda r: r['State']).glom().collect()

[[' CA',
  ' BC',
  ' AL',
  ' AZ',
  ' CA',
  ' AL',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA'],
 [' DE', ' FL', ' CT', ' DC', ' CO', ' FL', ' FL', ' CO', ' FL', ' CO', ' FL'],
 [' GA',
  ' IL',
  ' IA',
  ' IN',
  ' GA',
  ' ID',
  ' IN',
  ' GA',
  ' IL',
  ' IN',
  ' IA',
  ' GA',
  ' IL',
  ' IN'],
 [' MA',
  ' MB',
  ' KS',
  ' MI',
  ' KS',
  ' MA',
  ' LA',
  ' MI',
  ' MD',
  ' KS',
  ' MI',
  ' MI'],
 [' NC',
  ' NC',
  ' MS',
  ' MS',
  ' MO',
  ' MO',
  ' NB',
  ' MN',
  ' MO',
  ' MO',
  ' MN',
  ' NC',
  ' MN'],
 [' ND', ' NY', ' ND', ' NY', ' NJ', ' NY', ' NY', ' NM', ' NM', ' NY', ' NV'],
 [' OH',
  ' PA',
  ' PA',
  ' PA',
  ' OK',
  ' ON',
  ' OH',
  ' PA',
  ' OH',
  ' OH',
  ' OK',
  ' PA',
  ' OH',
  ' OR',
  ' PA',
  ' OH '],
 [' SD',
  ' TX',
  ' SD',
  ' TX',
  ' TX',
  ' TX',
  ' TX',
  ' SC',
  ' SD',
  ' TX',
  ' TX',
  ' TX',
  ' SA'],
 [' WA',
  ' VA',
  ' WA',
  ' WA',
  ' WA',
  ' VA',
  ' WA',
  ' WA',
  ' U

In [59]:
partitionedRange.coalesce(3).rdd.map(lambda r: r['State']).glom().collect()

[[' CA',
  ' BC',
  ' AL',
  ' AZ',
  ' CA',
  ' AL',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' CA',
  ' MA',
  ' MB',
  ' KS',
  ' MI',
  ' KS',
  ' MA',
  ' LA',
  ' MI',
  ' MD',
  ' KS',
  ' MI',
  ' MI',
  ' OH',
  ' PA',
  ' PA',
  ' PA',
  ' OK',
  ' ON',
  ' OH',
  ' PA',
  ' OH',
  ' OH',
  ' OK',
  ' PA',
  ' OH',
  ' OR',
  ' PA',
  ' OH ',
  ' WI',
  ' WV',
  ' WV',
  ' WI',
  ' WI',
  ' WY',
  ' WY',
  ' WI'],
 [' DE',
  ' FL',
  ' CT',
  ' DC',
  ' CO',
  ' FL',
  ' FL',
  ' CO',
  ' FL',
  ' CO',
  ' FL',
  ' NC',
  ' NC',
  ' MS',
  ' MS',
  ' MO',
  ' MO',
  ' NB',
  ' MN',
  ' MO',
  ' MO',
  ' MN',
  ' NC',
  ' MN',
  ' SD',
  ' TX',
  ' SD',
  ' TX',
  ' TX',
  ' TX',
  ' TX',
  ' SC',
  ' SD',
  ' TX',
  ' TX',
  ' TX',
  ' SA'],
 [' GA',
  ' IL',
  ' IA',
  ' IN',
  ' GA',
  ' ID',
  ' IN',
  ' GA',
  ' IL',
  ' IN',
  ' IA',
  ' GA',
  ' IL',
  ' IN',
  ' ND',
  ' NY',
  ' ND',
  ' NY',
  ' NJ',
  ' NY',
  ' NY',
  ' NM'