In [1]:
import os
import shutil
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark import SparkContext

In [2]:
# set the write properties for spartk context
SparkContext.setSystemProperty('spark.executor.memory', '50g')
SparkContext.setSystemProperty("spark.ui.port", "9800")


sc = SparkContext("local", "orc_writer").getOrCreate()
sql_sc = SQLContext(sc)

## Input dirs

In [3]:
input_dir = '../../dataset'

In [9]:
datasets = ['nyt','tweets']

In [10]:
dataset_d = {}
for dataset in datasets:
    dataset_d[dataset] = [fn for fn in os.listdir(os.path.join(input_dir,dataset)) if '.csv' in fn]

# Read and Write Files to orc

In [11]:
for dataset_dir,fn_ls in dataset_d.items():
    for fn in fn_ls:
        input_file_path = os.path.join(input_dir,dataset_dir,fn)        
        # reading input file path
        print("Reading {}".format(input_file_path))
        
        df = sql_sc.read.csv(input_file_path, header='true',inferSchema='true')
        
        #write output to path:
        output_dir = os.path.join(input_dir,dataset_dir)
        output_file_path_orc = os.path.join(output_dir,'orc',fn[:-4])
        output_file_path_json = os.path.join(output_dir,'json',fn[:-4])
        output_file_path_parquet = os.path.join(output_dir,'parquet',fn[:-4])
        
        shutil.rmtree(output_file_path_orc, ignore_errors=True)   
        shutil.rmtree(output_file_path_json, ignore_errors=True)   
        shutil.rmtree(output_file_path_parquet, ignore_errors=True)   
        
        df.write.format("orc").save(output_file_path_orc)
        print("Completed {}".format(output_file_path_orc))
        
        df.write.format("json").save(output_file_path_json)
        print("Completed {}".format(output_file_path_json))

        df.write.format("parquet").save(output_file_path_parquet)
        print("Completed {}".format(output_file_path_parquet))
        
        print("\n"+"--"*10+"\n")

Reading ../../dataset/nyt/ArticlesFeb2017.csv
Completed ../../dataset/nyt/orc/ArticlesFeb2017
Completed ../../dataset/nyt/json/ArticlesFeb2017
Completed ../../dataset/nyt/parquet/ArticlesFeb2017

--------------------

Reading ../../dataset/nyt/ArticlesMarch2017.csv
Completed ../../dataset/nyt/orc/ArticlesMarch2017
Completed ../../dataset/nyt/json/ArticlesMarch2017
Completed ../../dataset/nyt/parquet/ArticlesMarch2017

--------------------

Reading ../../dataset/nyt/ArticlesJan2018.csv
Completed ../../dataset/nyt/orc/ArticlesJan2018
Completed ../../dataset/nyt/json/ArticlesJan2018
Completed ../../dataset/nyt/parquet/ArticlesJan2018

--------------------

Reading ../../dataset/nyt/CommentsJan2017.csv
Completed ../../dataset/nyt/orc/CommentsJan2017
Completed ../../dataset/nyt/json/CommentsJan2017
Completed ../../dataset/nyt/parquet/CommentsJan2017

--------------------

Reading ../../dataset/nyt/CommentsFeb2017.csv
Completed ../../dataset/nyt/orc/CommentsFeb2017
Completed ../../dataset/ny

# Test written FIle

In [31]:
output_file = '../../dataset/nyt/orc/CommentsApril2017'
orc_df = sql_sc.read.orc(output_file)

In [34]:
orc_df

DataFrame[approveDate: string, commentBody: string, commentID: string, commentSequence: string, commentTitle: string, commentType: string, createDate: string, depth: string, editorsSelection: string, parentID: string, parentUserDisplayName: string, permID: string, picURL: string, recommendations: string, recommendedFlag: string, replyCount: string, reportAbuseFlag: string, sharing: string, status: string, timespeople: string, trusted: string, updateDate: string, userDisplayName: string, userID: string, userLocation: string, userTitle: string, userURL: string, inReplyTo: string, articleID: string, sectionName: string, newDesk: string, articleWordCount: string, printPage: string, typeOfMaterial: string]

In [16]:
df.write.format("json").save('test')