 a PySpark program isn’t that much different from a regular Python program, but the execution model can be very different from a regular Python program, especially if you’re running on a cluster.

There can be a lot of things happening behind the scenes that distribute the processing across multiple nodes if you’re on a cluster. However, for now, think of the program as a Python program that uses the PySpark library.

Now that you’ve seen some common functional concepts that exist in Python as well as a simple PySpark program, it’s time to dive deeper into Spark and PySpark.

To interact with PySpark, you create specialized data structures called Resilient Distributed Datasets (RDDs).

RDDs hide all the complexity of transforming and distributing your data automatically across multiple nodes by a scheduler if you’re running on a cluster.

In [3]:
!pip install pyspark

You should consider upgrading via the '/Users/marigaldina/opt/anaconda3/bin/python -m pip install --upgrade pip' command.[0m


In [1]:
import pandas as pd
#from pyspark.sql import SparkSession
#from pyspark.context import SparkContext
#from pyspark.sql.functions
#import * from pyspark.sql.types
#import * from datetime import date, timedelta, datetime
import time

In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [5]:
import random
NUM_SAMPLES = 100000000
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1
count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count()
pi = 4 * count / NUM_SAMPLES
print('Pi is roughly', pi)

Pi is roughly 3.14170384


In [6]:
big_list = range(10000)
rdd = sc.parallelize(big_list, 2)
odds = rdd.filter(lambda x: x % 2 != 0)
odds.take(5)

[1, 3, 5, 7, 9]

In [None]:
#Creates a spark data frame called as raw_data.
#JSON
dataframe = sc.read.json('dataset/nyt2.json')
#TXT FILES# 
dataframe_txt = sc.read.text('text_data.txt')
#CSV FILES# 
dataframe_csv = sc.read.csv('csv_data.csv')
#PARQUET FILES# 
dataframe_parquet = sc.read.load('parquet_data.parquet')

In [None]:
#Duplicate values in a table can be eliminated by using dropDuplicates() function.
dataframe = sc.read.json('dataset/nyt2.json') 
dataframe.show(10)

In [None]:
#xAfter dropDuplicates() function is applied, we can observe that duplicates are removed from dataset.
dataframe_dropdup = dataframe.dropDuplicates()
dataframe_dropdup.show(10)

In [None]:
#Show all entries in title column
dataframe.select("author").show(10)
#Show all entries in title, author, rank, price columns
dataframe.select("author", "title", "rank", "price").show(10)

In [3]:
txt = sc.textFile('.data/LICENSE.txt')
print(txt.count())

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/marigaldina/projects/python_blog_materials/.data/LICENSE.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:844)


In [None]:

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())