In [31]:
from operator import add
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f

FILE_IN='../fixtures/test.csv'
SCHEMA=StructType([
    StructField('account_id', LongType(), False),
    StructField('last_name', StringType(), False),
    StructField('first_name', StringType(), False),
    StructField('phone', StringType(), False),
    StructField('address_1', StringType(), False),
    StructField('address_2', StringType(), True),
    StructField('city', StringType(), False),
    StructField('state', StringType(), False),
    StructField('postal_code', StringType(), False),
    StructField('plan_id', StringType(), False),
    StructField('foundation_id', StringType(), True),
    StructField('joined_at', TimestampType(), False),
    StructField('prev_balance', FloatType(), True),
    StructField('adjustments', FloatType(), True),
    StructField('prev_voice', IntegerType(), True),
    StructField('prev_data', IntegerType(), True),
    StructField('line', StringType(), False),
    StructField('txn_type', StringType(), False),
    StructField('txn_at', StringType(), False),
    StructField('place', StringType(), True),
    StructField('sent_recv', StringType(), True),
    StructField('to_from', StringType(), True),
    StructField('in_plan', IntegerType(), True),
    StructField('in_network', IntegerType(), True),
    StructField('mins', IntegerType(), True),
    StructField('type_unit', StringType(), True)
])

In [2]:
# override default spark session var
spark = (SparkSession
         .builder
         #.config("spark.driver.cores", "2")
         .appName("read_csv")
         .getOrCreate())

In [3]:
# read data from sample CSV file, apply schema.
df = (spark.read
      .option("header", "false")
      #.option("mode", "DROPMALFORMED")
      #.option("inferSchema", "true") 
      .schema(SCHEMA)
      .csv(FILE_IN))
df.dtypes

[('account_id', 'bigint'),
 ('last_name', 'string'),
 ('first_name', 'string'),
 ('phone', 'string'),
 ('address_1', 'string'),
 ('address_2', 'string'),
 ('city', 'string'),
 ('state', 'string'),
 ('postal_code', 'string'),
 ('plan_id', 'string'),
 ('foundation_id', 'string'),
 ('joined_at', 'timestamp'),
 ('prev_balance', 'float'),
 ('adjustments', 'float'),
 ('prev_voice', 'int'),
 ('prev_data', 'int'),
 ('line', 'string'),
 ('txn_type', 'string'),
 ('txn_at', 'string'),
 ('place', 'string'),
 ('sent_recv', 'string'),
 ('to_from', 'string'),
 ('in_plan', 'int'),
 ('in_network', 'int'),
 ('mins', 'int'),
 ('type_unit', 'string')]

In [4]:
df.head()

Row(account_id=1006461066240, last_name='Corgan', first_name='Cocktail', phone='3063434235', address_1='36328 Rachel Wall', address_2=None, city='Michellemouth', state='VI', postal_code='59957', plan_id='PLAN4000', foundation_id=None, joined_at=datetime.datetime(2007, 1, 9, 12, 46, 1), prev_balance=0.0, adjustments=0.0, prev_voice=1602, prev_data=1287192, line='4030133964', txn_type='VOC', txn_at='2017-12-27 19:46:57-08:00', place='INCOMI CL', sent_recv=None, to_from='4030133964', in_plan=1, in_network=0, mins=3, type_unit=None)

In [5]:
# count of rows in df
print("count: {}".format(df.count()))

count: 57911


In [6]:
# filter
plan4000 = df.filter(df.plan_id == 'PLAN4000')
plan4000.count()

14673

In [8]:
# filter with sql
df.createOrReplaceTempView('plans')
plan4000 = spark.sql("SELECT account_id FROM plans where plan_id = 'PLAN4000'")

In [12]:
print("Plan count: {}".format(plan4000.count()))

Plan count: 100


In [13]:
# select semantics (w/ distinct)
plan4000 = df.select('account_id').distinct()
print("Distinct Account IDs: {}".format(plan4000.count()))

Distinct Account IDs: 100


In [21]:
# SQL aggregates
plan_counts = spark.sql("""
    select 
        count(*) cnt, 
        plan_id 
    from 
        plans 
    group by 
        2 
    order by 
        1 desc 
""")
plan_counts.show()

+-----+--------+
|  cnt| plan_id|
+-----+--------+
|19514|PLAN1000|
|14673|PLAN4000|
|11964|PLAN3000|
|11760|PLAN2000|
+-----+--------+



In [33]:
# df aggregates
plan_counts = (df.groupby(df.plan_id)
               .agg(f.count(f.lit(1)).alias('cnt'))
              )

plan_counts.show()

+--------+-----+
| plan_id|  cnt|
+--------+-----+
|PLAN1000|19514|
|PLAN3000|11964|
|PLAN4000|14673|
|PLAN2000|11760|
+--------+-----+



In [None]:
plan_counts = df.groupby

In [None]:
# aggregates