In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd

- I/O to HDFS

In [3]:
flight = spark.read.csv('/user/peizhou.liao/giraph/data/Airports2.csv', sep=',', header=True)

flight.write.save('/user/peizhou.liao/giraph/data/Airports_new.csv', format='csv')

In [None]:
# display some rows
flight.show(5)  # flight.show(5, False) full display

- Data filtering

In [None]:
# display the count of null values
flight.filter(flight['Seats'].isNull()).count()

# display a sinlge column
flight.select('Origin_city').show()

# filter rows
flight.where(flight['Origin_airport']=='ATL')
flight.where(F.col('Origin_airport').isNotNull())

# filter rows and select some columns
flight.where(flight['Origin_airport']=='ATL').select(['Origin_city', 'Destination_city'])

# concatenate dataframe
flight_dup = flight.union(flight)

# keep distinct rows
flight_uniq = fligh_dup.distinct() 
flight_airport = flight.select('Origin_airport').distinct()

# sort dataframe by column
flight.sort(F.col('Seats').desc()).show(n=10)

# groupby and aggregate
flight.groupBy('Origin_airport').agg(F.sum('Seats'), F.countDistinct('Destination_airport'))

- Statistical analysis

In [None]:
# sample 10% of rows without replacement
flight_sample = flight.sample(False, 0.1, seed=11)  # flight.sample(True, 0.1, seed=11) with replacement

# count frequency
flight.cube('Origin_airport').count()

# compute correlation
flight.stat.corr('Passengers', 'Seats')

# compute percentiles
flight.approxQuantile('Passengers', [0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.99], 0.01)

# merge two dataframes
new_data = data.join(traffic, on=['maid', 'dt'], how='left_outer')

- UDF

In [None]:
ts2date = F.udf(
    lambda x: x.split()[0],
    returnType=StringType()
)

# UDF with error handling
def ts2hour(s):
    try:
        res = int(s.split()[1].split(':')[0])
    except:
        res = None
    return res  
ts2Hour = F.udf(
    lambda x: ts2hour(x),
    returnType=IntegerType()
)

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, udf

df = <some pyspark dataframe with a column of feature_vector with the data type of pyspark.ml.linalg.SparseVector>
model_weights = <just your typical numpy array>

def get_weighted_sum(model_weights, user_feature_vector):
    return user_feature_vector.dot(model_weights) / user_feature_vector.norm(1)

def get_weighted_sum_udf(model_weights):
    return udf(lambda feature_vector: get_weighted_sum(model_weights, feature_vector), DoubleType())

new_df = df.select(get_weighted_sum_udf(model_weights)(col('feature_vector')).alias('weighted_sum'))

- Useful operations

In [None]:
# extract elements from an array of struct
df = df.withColumn('idType', df['ids'].getItem(0).idType).withColumn('idValue', df['ids'].getItem(0).idValue)

- Two ways to convert an RDD to DF 

In [None]:
# toDF()
from pyspark.sql.types import Row
#here you are going to create a function
def f(x):
    d = {}
    for i in range(len(x)):
        d[str(i)] = x[i]
    return d
#Now populate that
df = rdd.map(lambda x: Row(**f(x))).toDF()

# createDataFrame(rdd, schema)
schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])
df = sqlContext.createDataFrame(rdd, schema)