In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()

In [0]:
df = spark.range(10000).toDF("number")

In [0]:
flight_data = spark.read.option("inferSchema", "true").option("header", "true").csv("/Volumes/workspace/myschema/myvolume/2015-summary.csv")
flight_data.sort("count")
spark.conf.set("spark.sql.shuffle.partitions", "50")
flight_data.sort("count").take(100)

[Row(DEST_COUNTRY_NAME='Zambia', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Cyprus', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Saint Vincent and the Grenadines', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='Indonesia', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Georgia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Namibia', count=1),
 Row(DEST_COUNTRY_NAME='Suriname', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Estonia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Papua New Guinea', count=1),
 Row(DEST_COUNTRY_NAME='New Caledonia', ORIGIN_COUNTRY_NAME='United States

In [0]:
flight_data.createOrReplaceTempView("flight_data")

In [0]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flight_data\
.groupBy("DEST_COUNTRY_NAME")\
.count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonGroupingAgg(keys=[DEST_COUNTRY_NAME#13459], functions=[finalmerge_count(merge count#13493L) AS count(1)#13490L])
         +- PhotonShuffleExchangeSource
            +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#9043]
               +- PhotonShuffleExchangeSink hashpartitioning(DEST_COUNTRY_NAME#13459, 50)
                  +- PhotonGroupingAgg(keys=[DEST_COUNTRY_NAME#13459], functions=[partial_count(1) AS count#13493L])
                     +- PhotonRowToColumnar
                        +- FileScan csv [DEST_COUNTRY_NAME#13459] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/Volumes/workspace/myschema/myvolume/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Photon Explanation ==
The query is fully supported by Photon.
== Physical Plan ==
AdaptiveSp

In [0]:
flightData2015 = flight_data

In [0]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [0]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

from pyspark.sql.functions import desc

flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [0]:
staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/workspace/myschema/myvolume/2010-12-01.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

from pyspark.sql.functions import window, column, desc, col

staticDataFrame.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate").groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day")).sum("total_cost").show(5)

spark.conf.set("spark.sql.shuffle.partitions", "5")

In [0]:
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.option("checkpointLocation", "/tmp/checkpoints/retail")\
.format("csv")\
.option("header", "true")\
.load("/data/retail-data/by-day/*.csv")

purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-5330412972019294>, line 2[0m
[1;32m      1[0m streamingDataFrame [38;5;241m=[39m spark[38;5;241m.[39mreadStream\
[0;32m----> 2[0m [38;5;241m.[39mschema(staticSchema)\
[1;32m      3[0m [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmaxFilesPerTrigger[39m[38;5;124m"[39m, [38;5;241m1[39m)\
[1;32m      4[0m [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mcheckpointLocation[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m/tmp/checkpoints/retail[39m[38;5;124m"[39m)\
[1;32m      5[0m [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      6[0m [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      7[0m [38;5;241m.[39mload([38;5;

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

schema = StructType(
    [StructField('DEST_COUNTRY_NAME', StringType(), True), 
     StructField('ORIGIN_COUNTRY_NAME', StringType(), True), 
     StructField('count', LongType(), True)]
    )
df = spark.read.format("json").schema(schema)\
    .load("/Volumes/workspace/myschema/myvolume/2015-summary.json")\
        .sortWithinPartitions("count")

In [0]:
from pyspark.sql.functions import expr

df.createOrReplaceTempView("dfTable")
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows
+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows


In [0]:
# df.selectExpr(
# "*", # all original columns
# "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
# .show(2)

# df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

# from pyspark.sql.functions import lit
# df.select(expr("*"), lit(1).alias("One")).show(2)
# "*", # all original columns
# "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
# .show(2)

# df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

# from pyspark.sql.functions import lit


# df.select(expr("*"), lit(1).alias("One")).show(2)

new_df = df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
    .withColumn("count_doubled", expr("count * 2"))\
        .withColumnRenamed("count", "count_origin")
new_df.show(2)

from pyspark.sql.functions import col

# new_df.filter(col("count_doubled") > 200).filter("DEST_COUNTRY_NAME = 'United States'").show(2)

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

+-----------------+-------------------+------------+-------------+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count_origin|withinCountry|count_doubled|
+-----------------+-------------------+------------+-------------+-------------+
|    United States|            Romania|          15|        false|           30|
|    United States|            Croatia|           1|        false|            2|
+-----------------+-------------------+------------+-------------+-------------+
only showing top 2 rows


256

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import desc, asc

schemaa = df.schema
newRows = [
Row("New Country", "Other Country", 5),
Row("New Country 2", "Other Country 3", 1)
]
newDF = spark.createDataFrame(newRows, schema=schemaa)
df.union(newDF)\
    .where("count>1")\
        .where(col("DEST_COUNTRY_NAME") != "United States")\
            .sort(col("count").desc(), col("ORIGIN_COUNTRY_NAME").asc())\
                .show()

+------------------+-------------------+-----+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+------------------+-------------------+-----+
|            Canada|      United States| 8399|
|            Mexico|      United States| 7140|
|    United Kingdom|      United States| 2025|
|             Japan|      United States| 1548|
|           Germany|      United States| 1468|
|Dominican Republic|      United States| 1353|
|       South Korea|      United States| 1048|
|       The Bahamas|      United States|  955|
|            France|      United States|  935|
|          Colombia|      United States|  873|
|            Brazil|      United States|  853|
|       Netherlands|      United States|  776|
|             China|      United States|  772|
|           Jamaica|      United States|  666|
|        Costa Rica|      United States|  588|
|       El Salvador|      United States|  561|
|            Panama|      United States|  510|
|              Cuba|      United States|  466|
|            

In [0]:
df.repartition(5, col("DEST_COUNTRY_NAME")).explain()
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()

# collectDF.toLocalIterator()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonShuffleExchangeSource
         +- PhotonShuffleMapStage REPARTITION_BY_NUM, [id=#9273]
            +- PhotonShuffleExchangeSink hashpartitioning(DEST_COUNTRY_NAME#13164, 5)
               +- PhotonJsonScan json [DEST_COUNTRY_NAME#13164,ORIGIN_COUNTRY_NAME#13165,count#13166L] Batched: true, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[dbfs:/Volumes/workspace/myschema/myvolume/2015-summary.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint>


== Photon Explanation ==
The query is fully supported by Photon.
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|   

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]