In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[2]").setAppName("RDD Example")
sc = SparkContext(conf=conf)

# different way of setting configurations
# conf.setMaster('some url')
# conf.set('spark.executor.memory', '2g')
# conf.set('spark.executor.cores', '4')
# conf.set('spark.cores.max', '40')
# conf.set('spark.logConf', True)

# sparkContext.parallelize materializes data into RDD
# documentation: https://spark.apache.org/docs/2.1.1/programming-guide.html#parallelized-collections
rdd = sc.parallelize(
    [("Richard", 22), ("Alfred", 23), ("Loki", 4), ("Albert", 12), ("Alfred", 9)]
)

rdd.collect()  # [('Richard', 22), ('Alfred', 23), ('Loki', 4), ('Albert', 12), ('Alfred', 9)]

# create two different RDDs
left = sc.parallelize([("Richard", 1), ("Alfred", 4)])
right = sc.parallelize([("Richard", 2), ("Alfred", 5)])

joined_rdd = left.join(right)
collected = joined_rdd.collect()

23/09/12 12:52:44 WARN Utils: Your hostname, masoud-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface wlp2s0)
23/09/12 12:52:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/09/12 12:52:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/12 12:53:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


                                                                                

In [2]:
collected  # [('Alfred', (4, 5)), ('Richard', (1, 2))]

[('Alfred', (4, 5)), ('Richard', (1, 2))]

In [3]:
# Code Used in SparkSession Example
# Notice we’re using pyspark.sql library here
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("CSV file loader").getOrCreate()

# couple ways of setting configurations
# spark.conf.set("spark.executor.memory", '8g')
# spark.conf.set('spark.executor.cores', '3')
# spark.conf.set('spark.cores.max', '3')
# spark.conf.set("spark.driver.memory", '8g')

file_path = "data/AB_NYC_2019.csv"
# Always load csv files with header=True
df = spark.read.csv(file_path, header=True)

In [4]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



In [5]:
df.select("neighbourhood").distinct().show(10, False)



+-------------+
|neighbourhood|
+-------------+
|Corona       |
|Richmondtown |
|Prince's Bay |
|Westerleigh  |
|Mill Basin   |
|40.76199     |
|Civic Center |
|40.83166     |
|Douglaston   |
|Mount Hope   |
+-------------+
only showing top 10 rows



                                                                                