In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import monotonically_increasing_id, to_timestamp, row_number, rand, col, date_format, when
from pyspark.sql.window import Window
import glob
import subprocess
import pandas as pd
import pyarrow.hdfs as hdfs

data_path = "/home/hduser/Desktop/CA2/Data/Unzipped/"
hdfs_data_path = "/CA2/Data/Unzipped/"
columns = ["dailyRecordId","userid","tweetid","text","hashtags","language"]
schema = StructType([StructField(col, StringType(), True) for col in columns])
datetimeHeader = "tweetcreatedts"
columns.append(datetimeHeader)
print(columns)
datetimeColumn = StructField(datetimeHeader, TimestampType(), True)
schema = StructType(schema.fields + [datetimeColumn])

spark = SparkSession.builder.appName("CSVtoSpark").getOrCreate()
fs = hdfs.connect(host='localhost', port=9000, user='hduser')


['dailyRecordId', 'userid', 'tweetid', 'text', 'hashtags', 'language', 'tweetcreatedts']


  fs = hdfs.connect(host='localhost', port=9000, user='hduser')


In [60]:
def loadNormalisedDf(path):
    temp_df = spark.read.csv(path, header=True)
    return temp_df

def setColumns(df):
    df = df.withColumnRenamed("_c0", "dailyRecordId")
    df = df.withColumn('timestamp',
                   when(to_timestamp(datetimeHeader, 'yyyy-MM-dd HH:mm:ss.SSSSSS').isNotNull(),
                        to_timestamp(datetimeHeader, 'yyyy-MM-dd HH:mm:ss.SSSSSS'))
                   .when(to_timestamp(datetimeHeader, 'MMM-dd-yyyy HH:mm:ss').isNotNull(),
                         to_timestamp(datetimeHeader, 'MMM-dd-yyyy HH:mm:ss')))
    df = df.withColumn(datetimeHeader, date_format(datetimeHeader, "yyyy-MM-dd"))
    df = df.filter(df.tweetcreatedts.isNotNull())
    result_df = df.select(columns).filter("language = 'en'")
    sampled_df = result_df.sample(0.001, seed=42).limit(30)   
    return sampled_df

def takeSamplePerDate(df, noOfSamples):
    print("Number of records before filtering is " + str(df.count()))
    w = Window.partitionBy(datetimeHeader).orderBy(rand())
    df_with_id = df.withColumn('row_id', row_number().over(w))
    df_with_id = df_with_id.filter(df_with_id.row_id <= noOfSamples)
    print("Number of records after filtering is " + str(df_with_id.count()))
    return df_with_id.drop('row_id')

# this method ignores the value of datetimeHeader and must be updated manually if that value changes
def RemoveNullDates(df):
    date = df.filter(df.tweetcreatedts.isNotNull()).select("tweetcreatedts").first()[0]
    return df.fillna({"tweetcreatedts": date})

In [61]:
def getAllFilePaths(directory):
    files = glob.glob(directory + "*.csv")
    print(files)
    return files

def getAllHdfsPaths(hdfs_directory):
    # declare the hadoop fs -ls command to list all CSV files in the directory
    cmd = "hadoop fs -ls {}/*.csv | awk '{{print $NF}}'".format(hdfs_data_path)
    
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
    
    output = proc.stdout.read().decode() # read output and decode byte stream
    output = output.strip().split() # remove whitespace and convert to array and return
    return output

#print(csv_paths)

In [62]:
df_combined = spark.createDataFrame([], schema)
panda_df = pd.DataFrame()
for path in getAllHdfsPaths(data_path):
    if fs.exists(path):
        new_df = loadNormalisedDf(path)
        new_df = setColumns(new_df)
        converted_df = new_df.toPandas()       
        panda_df = pd.concat([panda_df, converted_df], ignore_index=True)
print(panda_df.shape)

2023-05-04 21:54:39,952 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0401_UkraineCombinedTweetsDeduped.csv
2023-05-04 21:54:44,504 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0402_UkraineCombinedTweetsDeduped.csv
2023-05-04 21:54:48,117 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 21:56:26,358 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0425_UkraineCombinedTweetsDeduped.csv
2023-05-04 21:56:29,465 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0426_UkraineCombinedTweetsDeduped.csv
2023-05-04 21:56:32,614 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 21:57:59,097 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0521_UkraineCombinedTweetsDeduped.csv
2023-05-04 21:58:08,746 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0522_UkraineCombinedTweetsDeduped.csv
2023-05-04 21:58:18,110 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:01:44,472 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0616_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:01:50,094 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0617_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:01:55,641 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:03:47,609 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0710_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:03:51,861 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0711_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:03:56,761 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:05:29,462 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0803_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:05:35,328 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0804_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:05:39,165 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:06:20,217 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0827_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:06:21,503 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0828_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:06:22,624 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:07:00,677 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0920_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:07:03,921 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/0921_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:07:08,638 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:07:58,419 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/1014_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:07:59,988 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/1015_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:08:01,500 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:08:33,381 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/1107_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:08:35,049 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/1108_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:08:36,404 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:09:12,969 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/1201_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:09:14,614 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/1202_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:09:16,388 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:09:50,173 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/1225_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:09:52,362 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/1226_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:09:53,899 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/

2023-05-04 22:10:25,458 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230118_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:10:26,816 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230119_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:10:28,185 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/C

2023-05-04 22:11:04,225 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230210_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:11:05,800 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230211_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:11:07,343 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/C

2023-05-04 22:12:17,496 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230305_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:12:20,853 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230306_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:12:25,023 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/C

2023-05-04 22:13:52,594 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230328_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:13:57,573 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230329_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:14:02,061 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/C

2023-05-04 22:15:16,104 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230421_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:15:17,393 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/20230422_UkraineCombinedTweetsDeduped.csv
2023-05-04 22:15:19,152 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/C

2023-05-04 22:16:19,141 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/UkraineCombinedTweetsDeduped_MAR13.csv
2023-05-04 22:16:21,519 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Data/Unzipped/UkraineCombinedTweetsDeduped_MAR14.csv
2023-05-04 22:16:24,100 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , userid, tweetid, tweetcreatedts, text, hashtags, language
 Schema: _c0, userid, tweetid, tweetcreatedts, text, hashtags, language
Expected: _c0 but found: 
CSV file: hdfs://localhost:9000/CA2/Dat

(7504, 7)


In [63]:
panda_df.head(4)

Unnamed: 0,dailyRecordId,userid,tweetid,text,hashtags,language,tweetcreatedts
0,27067,891030643135610881,1509720862840213509,"""#Ukraine: Various previously unseen Russian l...","[{'text': 'Ukraine', 'indices': [15, 23]}, {'t...",en,2022-04-01
1,27551,1166303518870761473,1509721671787196444,"""#Ukraine: Another Ukrainian ambush against Ru...","[{'text': 'Ukraine', 'indices': [15, 23]}, {'t...",en,2022-04-01
2,28405,1496705355988054016,1509723106574749701,#Ukraine: Here is video from this position in ...,"[{'text': 'Ukraine', 'indices': [15, 23]}, {'t...",en,2022-04-01
3,31154,23917612,1509727113917739013,Fascist Russia has neither legal nor moral rig...,"[{'text': 'Ukraine', 'indices': [90, 98]}, {'t...",en,2022-04-01


In [64]:
panda_df = panda_df.sort_values(by='tweetcreatedts')

In [67]:
print(panda_df['tweetcreatedts'].head(1))
print(panda_df['tweetcreatedts'].tail(1))
print(panda_df.size)
print(panda_df.shape)

6513    2022-02-24
Name: tweetcreatedts, dtype: object
6483    2023-04-28
Name: tweetcreatedts, dtype: object
52528
(7504, 7)


In [70]:
unique_count = panda_df['tweetcreatedts'].nunique()
print(unique_count)

422


In [66]:
def testSingleFile(path): 
    print(path)
    new_df = loadNormalisedDf(path)
    #new_df = setColumns(new_df)
    df = new_df.withColumnRenamed("_c0", "dailyRecordId")
    #df.select('tweetcreatedts').show()
    #df.show()
    #df = df.withColumn(datetimeHeader, to_timestamp(datetimeHeader, "yyyy-MM-dd HH:mm:ss.SSSSSS"))
    
    df = df.withColumn(datetimeHeader, date_format(datetimeHeader, "yyyy-MM-dd"))
    df = df.filter(df.tweetcreatedts.isNotNull())
    #df.show()
    
    df.select('tweetcreatedts').show()
    #result_df = df.select(columns).filter("language = 'en'")
    #sampled_df = result_df.sample(0.001, seed=42).limit(30)   
    
    
#files = getAllHdfsPaths(data_path)
#path = files[-1]
#testSingleFile('/CA2/Data/Unzipped/20230106_UkraineCombinedTweetsDeduped.csv')

In [71]:
import os

# get the current working directory
cwd = os.getcwd()

# print the current working directory
print("Current working directory:", cwd)

Current working directory: /home/hduser/Desktop/CA2


In [73]:

# save the dataframe to a CSV file
panda_df.to_csv('outputWithoutIndex.csv', index=False)
panda_df.to_csv('outputWithIndex.csv', index=True)