In [17]:
# Install Spark and Java

# !pip install pyspark
# !apt-get update
# !apt-get install openjdk-11-jdk-headless -qq > /dev/null
# !wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
# !tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
# !pip install -q findspark

reviewing the basics of spark

In [18]:
import os
import findspark
from pyspark.sql import SparkSession
from pyspark import SparkFiles

# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession

findspark.init()

In [19]:
# Start Spark session
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

In [20]:
# Create DataFrame manually
dataframe = spark.createDataFrame(
      [
      (0, "Hello World"),
      (1, "Here is my first DataFrame"),
      (2, "Let's learn more!")
    ], ["id", "words"])

dataframe.show()

+---+--------------------+
| id|               words|
+---+--------------------+
|  0|         Hello World|
|  1|Here is my first ...|
|  2|   Let's learn more!|
+---+--------------------+



In [22]:

url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("food.csv"), sep=",", header=True)

In [None]:
df.show()

+-------+-----+
|   food|price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+



In [23]:
df.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: string (nullable = true)



In [24]:
df.columns

['food', 'price']

In [25]:
df.describe()

DataFrame[summary: string, food: string, price: string]

In [26]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [27]:
#create the list of struct fields
schema = [StructField("food", StringType(), True), StructField("price", IntegerType(), True),]
schema

[StructField(food,StringType,true), StructField(price,IntegerType,true)]

In [28]:
# Pass in our fields
final = StructType(fields=schema)
final

StructType(List(StructField(food,StringType,true),StructField(price,IntegerType,true)))

In [31]:
#read dataframe with new schema
dataframe = spark.read.csv(SparkFiles.get("food.csv"), schema=final, sep=",", header=True)
dataframe.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: integer (nullable = true)



In [32]:
dataframe['price']

Column<'price'>

In [33]:
type(dataframe['price'])

pyspark.sql.column.Column

In [34]:
dataframe.select('price')

DataFrame[price: int]

In [35]:
type(dataframe.select('price'))

pyspark.sql.dataframe.DataFrame

In [36]:
dataframe.select('price').show()

+-----+
|price|
+-----+
|    0|
|   12|
|   10|
+-----+



In [40]:
# Add new column
dataframe.withColumn('newprice', dataframe['price']).show()
# Update column name
dataframe.withColumnRenamed('price','newerprice').show()
# Double the price
dataframe.withColumn('doubleprice',dataframe['price']*2).show()
# Add a dollar to the price
dataframe.withColumn('add_one_dollar',dataframe['price']+1).show()
# Half the price
dataframe.withColumn('half_price',dataframe['price']/2).show()

+-------+-----+--------+
|   food|price|newprice|
+-------+-----+--------+
|  pizza|    0|       0|
|  sushi|   12|      12|
|chinese|   10|      10|
+-------+-----+--------+

+-------+----------+
|   food|newerprice|
+-------+----------+
|  pizza|         0|
|  sushi|        12|
|chinese|        10|
+-------+----------+

+-------+-----+-----------+
|   food|price|doubleprice|
+-------+-----+-----------+
|  pizza|    0|          0|
|  sushi|   12|         24|
|chinese|   10|         20|
+-------+-----+-----------+

+-------+-----+--------------+
|   food|price|add_one_dollar|
+-------+-----+--------------+
|  pizza|    0|             1|
|  sushi|   12|            13|
|chinese|   10|            11|
+-------+-----+--------------+

+-------+-----+----------+
|   food|price|half_price|
+-------+-----+----------+
|  pizza|    0|       0.0|
|  sushi|   12|       6.0|
|chinese|   10|       5.0|
+-------+-----+----------+



In [46]:
# Add new column
dataframe2 = dataframe.withColumn('newprice', dataframe['price'])
# Update column name
dataframe2 = dataframe2.withColumnRenamed('newprice','newerprice')

# Double the price
dataframe2 = dataframe2.withColumn('doubleprice',dataframe2['price']*2)
# Add a dollar to the price
dataframe2 = dataframe2.withColumn('add_one_dollar',dataframe2['price']+1)
# Half the price
dataframe2 = dataframe2.withColumn('half_price',dataframe2['price']/2)

dataframe2.show()

+-------+-----+----------+-----------+--------------+----------+
|   food|price|newerprice|doubleprice|add_one_dollar|half_price|
+-------+-----+----------+-----------+--------------+----------+
|  pizza|    0|         0|          0|             1|       0.0|
|  sushi|   12|        12|         24|            13|       6.0|
|chinese|   10|        10|         20|            11|       5.0|
+-------+-----+----------+-----------+--------------+----------+



In [42]:
dataframe.select('price').collect()

# set spark dataframe to a pandas data frame
import pandas as pd

pandas_df = dataframe.toPandas()

pandas_df.head()

Unnamed: 0,food,price
0,pizza,0
1,sushi,12
2,chinese,10
