In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
num = spark.range(1,5)

In [0]:
filtered = num.filter(col("id") >2)
filtered.show()

+---+
| id|
+---+
|  3|
|  4|
+---+



In [0]:
import pandas as pd
df = pd.read_csv("/Workspace/Users/digicomstarter43@gmail.com/mnm_dataset.csv")
df.sample(5)

Unnamed: 0,State,Color,Count
16087,WA,Blue,79
45060,TX,Green,49
29724,CO,Brown,69
74586,AZ,Brown,65
135,NV,Red,94


In [0]:
# Build a SparkSession using the SparkSession APIs.
# If one does not exist, then create an instance. There
# can only be one SparkSession per JVM
spark = (SparkSession 
         .builder
         .appName("PythonMnMCount")
         .getOrCreate())


In [0]:
mnm_df = (spark.read.format("csv")
          .option("header","true")
          .option("inferSchema","true")
          .load("/Workspace/Users/digicomstarter43@gmail.com/mnm_dataset.csv"))

In [0]:
# We use the DataFrame high-level APIs. Note
# that we don't use RDDs at all. Because some of Spark's
# functions return the same object, we can chain function calls.
# 1. Select from the DataFrame the fields "State", "Color", and "Count"
# 2. Since we want to group each state and its M&M color count,
# we use groupBy()
# 3. Aggregate counts of all colors and groupBy() State and Color
# 4 orderBy() in descending order

# In Databrick, We are getting an error which running actions on the spark dataframe read by spark directly from csv file
# So We read file with pandas and converted pandas dataframe in Spark dataframe

mnm_df = spark.createDataFrame(df)
count_mnm_df = (mnm_df
                .select("State","Color","Count")
                .groupBy("State","Color")
                .agg(count("Count").alias("Total"))
                .orderBy("Total", ascending=False))
count_mnm_df.show(n=60, truncate=False)
print(f"Total Rows {count_mnm_df.count()}")

+-----+------+-----+
|State|Color |Total|
+-----+------+-----+
|CA   |Yellow|1807 |
|WA   |Green |1779 |
|OR   |Orange|1743 |
|TX   |Green |1737 |
|TX   |Red   |1725 |
|CA   |Green |1723 |
|CO   |Yellow|1721 |
|CA   |Brown |1718 |
|CO   |Green |1713 |
|NV   |Orange|1712 |
|TX   |Yellow|1703 |
|NV   |Green |1698 |
|AZ   |Brown |1698 |
|CO   |Blue  |1695 |
|WY   |Green |1695 |
|NM   |Red   |1690 |
|AZ   |Orange|1689 |
|NM   |Yellow|1688 |
|NM   |Brown |1687 |
|UT   |Orange|1684 |
|NM   |Green |1682 |
|UT   |Red   |1680 |
|AZ   |Green |1676 |
|NV   |Yellow|1675 |
|NV   |Blue  |1673 |
|WA   |Red   |1671 |
|WY   |Red   |1670 |
|WA   |Brown |1669 |
|NM   |Orange|1665 |
|WY   |Blue  |1664 |
|WA   |Yellow|1663 |
|WA   |Orange|1658 |
|CA   |Orange|1657 |
|NV   |Brown |1657 |
|CA   |Red   |1656 |
|CO   |Brown |1656 |
|UT   |Blue  |1655 |
|AZ   |Yellow|1654 |
|TX   |Orange|1652 |
|AZ   |Red   |1648 |
|OR   |Blue  |1646 |
|OR   |Red   |1645 |
|UT   |Yellow|1645 |
|CO   |Orange|1642 |
|TX   |Brown 

In [0]:
count_mnm_df.select("State","Color","Total").where(col("State") == "CA").show(n=10, truncate=False) # For a custom state 

+-----+------+-----+
|State|Color |Total|
+-----+------+-----+
|CA   |Yellow|1807 |
|CA   |Green |1723 |
|CA   |Brown |1718 |
|CA   |Orange|1657 |
|CA   |Red   |1656 |
|CA   |Blue  |1603 |
+-----+------+-----+



In [0]:
spark.stop() # Stop the SparkSession