<a href="https://colab.research.google.com/github/bkdevart/my_colab_demo_2/blob/main/spark_filtering_solution_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
# Find the latest version of spark 3.x  from https://downloads.apache.org/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.1'
spark_version = 'spark-3.5.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()


Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done


In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameFunctions").getOrCreate()


In [None]:
from pyspark import SparkFiles
url ="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/1/nutrition.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("nutrition.csv"), sep=",", header=True, ignoreLeadingWhiteSpace=True) #Observe the need to use ignoreLeadingWhiteSpace=True, otherwise a leading whitespace will appear in the column names

# Show DataFrame
df.show()


+---------------+--------------+----------+---------+----------+-------------+--------------------+
|        dish_id|total_calories|total_mass|total_fat|total_carb|total_protein|         ingredients|
+---------------+--------------+----------+---------+----------+-------------+--------------------+
|dish_1561662216|    300.794281|193.000000|12.387489| 28.218290|    18.633970|soy sauce; garlic...|
|dish_1562688426|    137.569992| 88.000000| 8.256000|  5.190000|    10.297000|roasted potatoes;...|
|dish_1561662054|    419.438782|292.000000|23.838249| 26.351543|    25.910593|pepper; white ric...|
|dish_1562008979|    382.936646|290.000000|22.224644| 10.173570|    35.345387|jalapenos; lemon ...|
|dish_1560455030|     20.590000|103.000000| 0.148000|  4.625000|     0.956000|cherry tomatoes; ...|
|dish_1558372433|     74.360001|143.000000| 0.286000|  0.429000|    20.020000|          deprecated|
|dish_1563379132|    232.050003|119.000000|14.280000| 14.280000|    10.591001|         chilaquiles|


In [None]:
# Note that all the columns were read in as strings
df.printSchema()


root
 |-- dish_id: string (nullable = true)
 |-- total_calories: string (nullable = true)
 |-- total_mass: string (nullable = true)
 |-- total_fat: string (nullable = true)
 |-- total_carb: string (nullable = true)
 |-- total_protein: string (nullable = true)
 |-- ingredients: string (nullable = true)



In [None]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, DoubleType, StructType


In [None]:
# Next we need to create the list of struct fields
schema = [StructField("dish_id", StringType(), True), StructField("total_calories", DoubleType(), True),
        StructField("total_mass", DoubleType(), True), StructField("total_fat", DoubleType(), True),
        StructField("total_carb", DoubleType(), True), StructField("total_protein", DoubleType(), True),
         StructField("ingredients", StringType(), True)]
final = StructType(fields=schema)


In [None]:
# Read our data with our new schema
df = spark.read.csv(SparkFiles.get("nutrition.csv"), schema=final, sep=",", header=True, ignoreLeadingWhiteSpace=True)
df.printSchema()


root
 |-- dish_id: string (nullable = true)
 |-- total_calories: double (nullable = true)
 |-- total_mass: double (nullable = true)
 |-- total_fat: double (nullable = true)
 |-- total_carb: double (nullable = true)
 |-- total_protein: double (nullable = true)
 |-- ingredients: string (nullable = true)



In [None]:
# Order a DataFrame by ascending values
df.orderBy(df["total_calories"].asc()).show(5)


+---------------+--------------+----------+---------+----------+-------------+-------------+
|        dish_id|total_calories|total_mass|total_fat|total_carb|total_protein|  ingredients|
+---------------+--------------+----------+---------+----------+-------------+-------------+
|dish_1557861216|           0.0|       1.0|      0.0|       0.0|          0.0|   plate only|
|dish_1556575700|           0.0|      86.0|      0.0|       0.0|          0.0|   plate only|
|dish_1558461431|          1.15|       5.0|     0.02|      0.18|        0.145|spinach (raw)|
|dish_1558460205|          1.84|       8.0|    0.032|     0.288|        0.232|spinach (raw)|
|dish_1558459602|          2.07|       9.0|    0.036|     0.324|        0.261|spinach (raw)|
+---------------+--------------+----------+---------+----------+-------------+-------------+
only showing top 5 rows



In [None]:
# Order a DataFrame by descending values
df.orderBy(df["total_calories"].desc()).show(5)


+---------------+--------------+----------+----------+----------+-------------+--------------------+
|        dish_id|total_calories|total_mass| total_fat|total_carb|total_protein|         ingredients|
+---------------+--------------+----------+----------+----------+-------------+--------------------+
|dish_1551567573|    9485.81543|     159.0|875.541016|506.078979|    87.785004|olives; sausage; ham|
|dish_1551567604|    9485.81543|     159.0|875.541016| 506.07901|    87.785004|ham; sausage; olives|
|dish_1551567508|   9170.099609|    7974.0|853.218018|   502.362|    63.792004|              olives|
|dish_1560974769|   3943.325195|    3051.0| 10.976549|844.568604|    83.749603|spinach (raw); to...|
|dish_1551389588|   2400.780029|     133.0| 27.552002|732.300049|    90.181999|corn; avocado; lemon|
+---------------+--------------+----------+----------+----------+-------------+--------------------+
only showing top 5 rows



In [None]:
# Import average function
from pyspark.sql.functions import avg
df.select(avg("total_calories")).show()


+-------------------+
|avg(total_calories)|
+-------------------+
| 223.98083459731635|
+-------------------+



In [None]:
# Using filter
df.filter("total_calories<200").show()


+---------------+--------------+----------+---------+----------+-------------+--------------------+
|        dish_id|total_calories|total_mass|total_fat|total_carb|total_protein|         ingredients|
+---------------+--------------+----------+---------+----------+-------------+--------------------+
|dish_1562688426|    137.569992|      88.0|    8.256|      5.19|       10.297|roasted potatoes;...|
|dish_1560455030|         20.59|     103.0|    0.148|     4.625|        0.956|cherry tomatoes; ...|
|dish_1558372433|     74.360001|     143.0|    0.286|     0.429|        20.02|          deprecated|
|dish_1565640549|     45.482903|     139.0| 1.568471|  7.043886|     2.641478|tomatoes; cilantr...|
|dish_1561575474|    120.058434|     183.0| 4.966118| 17.412746|     2.990431|salt; eggplant; r...|
|dish_1550795690|     68.119995|     131.0|    0.262|     18.34|        0.393|               apple|
|dish_1565972591|    195.199997|     122.0|     12.2|      3.66|        17.08|chicken apple sau...|


In [None]:
# Filter by total_calories on certain columns
df.filter("total_calories<200").select(['total_mass','total_fat', 'total_carb','total_protein']).show() #excludes the last column (ingredients)


+----------+---------+----------+-------------+
|total_mass|total_fat|total_carb|total_protein|
+----------+---------+----------+-------------+
|      88.0|    8.256|      5.19|       10.297|
|     103.0|    0.148|     4.625|        0.956|
|     143.0|    0.286|     0.429|        20.02|
|     139.0| 1.568471|  7.043886|     2.641478|
|     183.0| 4.966118| 17.412746|     2.990431|
|     131.0|    0.262|     18.34|        0.393|
|     122.0|     12.2|      3.66|        17.08|
|      78.0|    0.234|     10.92|        0.546|
|      19.0|    0.038|       1.9|        0.171|
|      79.0|    0.079|     10.27|        0.395|
|       6.0|     2.52|     0.084|         2.22|
|      95.0| 2.571043|   3.28026|      2.80497|
|      88.0|    0.088|     14.96|         1.76|
|      46.0| 5.357862|  2.088042|     0.792647|
|     152.0| 4.168533| 10.369692|     1.329856|
|      75.0|    0.075|      9.75|        0.375|
|      52.0|   11.686|     11.48|        6.152|
|     134.0|    14.74|     2.144|    13.

In [None]:
# Same results only this time using python
df.filter(df["total_calories"] < 200).show()


+---------------+--------------+----------+---------+----------+-------------+--------------------+
|        dish_id|total_calories|total_mass|total_fat|total_carb|total_protein|         ingredients|
+---------------+--------------+----------+---------+----------+-------------+--------------------+
|dish_1562688426|    137.569992|      88.0|    8.256|      5.19|       10.297|roasted potatoes;...|
|dish_1560455030|         20.59|     103.0|    0.148|     4.625|        0.956|cherry tomatoes; ...|
|dish_1558372433|     74.360001|     143.0|    0.286|     0.429|        20.02|          deprecated|
|dish_1565640549|     45.482903|     139.0| 1.568471|  7.043886|     2.641478|tomatoes; cilantr...|
|dish_1561575474|    120.058434|     183.0| 4.966118| 17.412746|     2.990431|salt; eggplant; r...|
|dish_1550795690|     68.119995|     131.0|    0.262|     18.34|        0.393|               apple|
|dish_1565972591|    195.199997|     122.0|     12.2|      3.66|        17.08|chicken apple sau...|


In [None]:
# Filter on the price column for items less than 200 dollars and greater than 80 dollars.
df.filter( (df["total_calories"] < 200) | (df['total_mass'] > 80) ).show()


+---------------+--------------+----------+---------+----------+-------------+--------------------+
|        dish_id|total_calories|total_mass|total_fat|total_carb|total_protein|         ingredients|
+---------------+--------------+----------+---------+----------+-------------+--------------------+
|dish_1561662216|    300.794281|     193.0|12.387489|  28.21829|     18.63397|soy sauce; garlic...|
|dish_1562688426|    137.569992|      88.0|    8.256|      5.19|       10.297|roasted potatoes;...|
|dish_1561662054|    419.438782|     292.0|23.838249| 26.351543|    25.910593|pepper; white ric...|
|dish_1562008979|    382.936646|     290.0|22.224644|  10.17357|    35.345387|jalapenos; lemon ...|
|dish_1560455030|         20.59|     103.0|    0.148|     4.625|        0.956|cherry tomatoes; ...|
|dish_1558372433|     74.360001|     143.0|    0.286|     0.429|        20.02|          deprecated|
|dish_1563379132|    232.050003|     119.0|    14.28|     14.28|    10.591001|         chilaquiles|


In [None]:
# Filter on a specific value in a column.
df.filter(df["ingredients"] == "bacon").show()


+---------------+--------------+----------+---------+----------+-------------+-----------+
|        dish_id|total_calories|total_mass|total_fat|total_carb|total_protein|ingredients|
+---------------+--------------+----------+---------+----------+-------------+-----------+
|dish_1563381680|     32.459999|       6.0|     2.52|     0.084|         2.22|      bacon|
|dish_1559319860|     70.330002|      13.0|     5.46|     0.182|         4.81|      bacon|
|dish_1562086702|    178.529999|      33.0|    13.86|     0.462|        12.21|      bacon|
|dish_1551391710|    102.789993|      19.0|     7.98|     0.266|         7.03|      bacon|
|dish_1564073860|    492.309998|      91.0|38.219997|     1.274|    33.670002|      bacon|
|dish_1550776767|     81.149994|      15.0|      6.3|      0.21|         5.55|      bacon|
|dish_1558032156|    140.660004|      26.0|    10.92|     0.364|         9.62|      bacon|
|dish_1551136683|     70.330002|      13.0|     5.46|     0.182|         4.81|      bacon|