In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()  

Reading a file

In [38]:
rdd_raw = sc.textFile('testdata.csv')
print(type(rdd_raw))
header = rdd_raw.first() #extract header
rdd = rdd_raw.filter(lambda row: row != header)

<class 'pyspark.rdd.RDD'>


In [39]:
dataframe = spark.read.format('com.databricks.spark.csv')\
              .options(header='true', delimiter=',')\
              .load('testdata.csv')
print(type(dataframe))

<class 'pyspark.sql.dataframe.DataFrame'>


Showing data

In [40]:
for row in rdd.take(10):
    print(row)

1/2/09 6:17,Product1,1200,Mastercard,carolina,Basildon,England,United Kingdom,1/2/09 6:00,1/2/09 6:08,51.5,-1.1166667
1/2/09 4:53,Product1,1200,Visa,Betina,Parkville                   ,MO,United States,1/2/09 4:42,1/2/09 7:49,39.195,-94.68194
1/2/09 13:08,Product1,1200,Mastercard,Federica e Andrea,Astoria                     ,OR,United States,1/1/09 16:21,1/3/09 12:32,46.18806,-123.83
1/3/09 14:44,Product1,1200,Visa,Gouya,Echuca,Victoria,Australia,9/25/05 21:13,1/3/09 14:22,-36.1333333,144.75
1/4/09 12:56,Product2,3600,Visa,Gerd W ,Cahaba Heights              ,AL,United States,11/15/08 15:47,1/4/09 12:45,33.52056,-86.8025
1/4/09 13:19,Product1,1200,Visa,LAURENCE,Mickleton                   ,NJ,United States,9/24/08 15:19,1/4/09 13:04,39.79,-75.23806
1/4/09 20:11,Product1,1200,Mastercard,Fleur,Peoria                      ,IL,United States,1/3/09 9:38,1/4/09 19:45,40.69361,-89.58889
1/2/09 20:09,Product1,1200,Mastercard,adam,Martin                      ,TN,United States,1/2/09 17:43,1/4/

In [41]:
dataframe.show(10)

+----------------+--------+-----+------------+-----------------+--------------------+-------------+--------------+---------------+------------+-----------+----------+
|Transaction_date| Product|Price|Payment_Type|             Name|                City|        State|       Country|Account_Created|  Last_Login|   Latitude| Longitude|
+----------------+--------+-----+------------+-----------------+--------------------+-------------+--------------+---------------+------------+-----------+----------+
|     1/2/09 6:17|Product1| 1200|  Mastercard|         carolina|            Basildon|      England|United Kingdom|    1/2/09 6:00| 1/2/09 6:08|       51.5|-1.1166667|
|     1/2/09 4:53|Product1| 1200|        Visa|           Betina|Parkville        ...|           MO| United States|    1/2/09 4:42| 1/2/09 7:49|     39.195| -94.68194|
|    1/2/09 13:08|Product1| 1200|  Mastercard|Federica e Andrea|Astoria          ...|           OR| United States|   1/1/09 16:21|1/3/09 12:32|   46.18806|   -123.83

In [42]:
dataframe.printSchema()

root
 |-- Transaction_date: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Account_Created: string (nullable = true)
 |-- Last_Login: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



Filtering

In [43]:
rdd_filtered = rdd.filter(lambda row_str: 'Product1' not in row_str)
print(rdd_filtered.count())

151


In [44]:
from pyspark.sql.functions import col
dataframe_filtered = dataframe.filter(col("Product")!="Product1")
print(dataframe_filtered.count())

151


Grouping by multiple columns + calculating average

In [45]:
rdd_split = rdd.map(lambda x: x.split(","))\
               .map(lambda x: (x[0], x[1], float(x[2]), x[3]))
for row in rdd_split.take(10):
    print(row)

('1/2/09 6:17', 'Product1', 1200.0, 'Mastercard')
('1/2/09 4:53', 'Product1', 1200.0, 'Visa')
('1/2/09 13:08', 'Product1', 1200.0, 'Mastercard')
('1/3/09 14:44', 'Product1', 1200.0, 'Visa')
('1/4/09 12:56', 'Product2', 3600.0, 'Visa')
('1/4/09 13:19', 'Product1', 1200.0, 'Visa')
('1/4/09 20:11', 'Product1', 1200.0, 'Mastercard')
('1/2/09 20:09', 'Product1', 1200.0, 'Mastercard')
('1/4/09 13:17', 'Product1', 1200.0, 'Mastercard')
('1/4/09 14:11', 'Product1', 1200.0, 'Visa')


In [50]:
test = rdd_split.map(lambda x: ((x[1], x[3]), (x[2],1)))\
         .reduceByKey(lambda a, b: tuple(x + y for x, y in zip(a,b)))\
         .map(lambda x: (x[0], (x[1][0]/x[1][1])))
for row in test.take(100):
    print(row)

(('Product1', 'Diners'), 1200.0)
(('Product1', 'Amex'), 1202.2727272727273)
(('Product2', 'Amex'), 3600.0)
(('Product2', 'Diners'), 3600.0)
(('Product3', 'Diners'), 7500.0)
(('Product3', 'Amex'), 7500.0)
(('Product1', 'Mastercard'), 1204.0772532188842)
(('Product1', 'Visa'), 1224.3820224719102)
(('Product2', 'Visa'), 3600.0)
(('Product2', 'Mastercard'), 3600.0)
(('Product3', 'Mastercard'), 7500.0)
(('Product3 ', 'Mastercard'), 7500.0)
(('Product3', 'Visa'), 7500.0)


In [52]:
from pyspark.sql.functions import avg
dataframe.groupBy("Product", "Payment_type").agg(avg("Price")).show(20)

+---------+------------+------------------+
|  Product|Payment_type|        avg(Price)|
+---------+------------+------------------+
| Product3|        Amex|            7500.0|
| Product1|  Mastercard|1204.0772532188842|
| Product3|        Visa|            7500.0|
| Product1|      Diners|            1200.0|
| Product3|      Diners|            7500.0|
| Product2|  Mastercard|            3600.0|
|Product3 |  Mastercard|            7500.0|
| Product2|      Diners|            3600.0|
| Product2|        Visa|            3600.0|
| Product2|        Amex|            3600.0|
| Product1|        Amex|1202.2727272727273|
| Product1|        Visa|1224.3820224719102|
| Product3|  Mastercard|            7500.0|
+---------+------------+------------------+

