In [2]:
import findspark
findspark.init()

In [3]:

import sys
import os
from operator import add

from pyspark.sql import SparkSession

In [4]:
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()

In [5]:
myRange = spark.range(1000).toDF("number")


# COMMAND ----------

divisBy2 = myRange.where("number % 2 = 0")

In [6]:
path = "/home/voyiger/projects"
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv(os.path.join(path,"Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv"))

In [7]:
flightData2015.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [9]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [10]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()

In [11]:
sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#15], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#15, 200)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#15], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/voyiger/projects/Spark-The-Definitive-Guide/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#15], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#15, 200)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#15], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/voyiger/projects/Spark-The-Definitive-Guide/data/flight-data/csv/201..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY

In [12]:
dataFrameWay.show()

+--------------------+-----+
|   DEST_COUNTRY_NAME|count|
+--------------------+-----+
|            Anguilla|    1|
|              Russia|    1|
|            Paraguay|    1|
|             Senegal|    1|
|              Sweden|    1|
|            Kiribati|    1|
|              Guyana|    1|
|         Philippines|    1|
|            Djibouti|    1|
|            Malaysia|    1|
|           Singapore|    1|
|                Fiji|    1|
|              Turkey|    1|
|                Iraq|    1|
|             Germany|    1|
|              Jordan|    1|
|               Palau|    1|
|Turks and Caicos ...|    1|
|              France|    1|
|              Greece|    1|
+--------------------+-----+
only showing top 20 rows



In [13]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)


[Row(max(count)=370002)]

In [14]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [15]:
#another way to do it, but I prefer sql way
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()


+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [17]:
myRange.show(),divisBy2.show()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows

+------+
|number|
+------+
|     0|
|     2|
|     4|
|     6|
|     8|
|    10|
|    12|
|    14|
|    16|
|    18|
|    20|
|    22|
|    24|
|    26|
|    28|
|    30|
|    32|
|    34|
|    36|
|    38|
+------+
only showing top 20 rows



(None, None)

In [4]:
text_file = '/home/voyiger/projects/spark1/foo.txt'

spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()

lines = spark.read.text(text_file).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
  .map(lambda x: (x, 1)) \
  .reduceByKey(add)
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))

spark.stop()

little: 1
star: 1
twinkle: 1
I: 2
sky: 1
what: 2
how: 2
bright: 1
so: 1
are: 3
in: 1
twindle: 1
you: 3
the: 1
wonder: 2
