In [80]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HDFSToSpark") \
    .getOrCreate()


In [81]:
hdfs_path = "hdfs:///user/ProjectTweets.csv"  # HDFS'deki dosyanın yolunu belirtin

df = spark.read.csv(hdfs_path, header=True, inferSchema=True)



                                                                                

In [90]:
df.show()

+---+----------+----------+--------+---------------+--------------------+
|  0|       ids|      date|    flag|           user|                text|
+---+----------+----------+--------+---------------+--------------------+
|  1|1467810672|07/04/2009|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|07/04/2009|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|07/04/2009|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|07/04/2009|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|07/04/2009|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|07/04/2009|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|07/04/2009|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|07/04/2009|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|  9|1467812025|07/04/2009|NO_QUERY|        mimismo|@twittera que me ...|
| 10|1467812416|07/04/2009|NO_QUERY| erinx3leannexo|spring break in p...|
| 11|1467812579|07/04/2009|NO_QUERY|  

In [83]:
# Sütun başlıklarını tanımlayın
new_columns = ["ids", "date", "flag", "user", "text"]

# Mevcut veri çerçevesinin sütun adlarını güncelleyin
for i, column_name in enumerate(new_columns):
    df = df.withColumnRenamed(df.columns[i + 1], column_name)

# DataFrame'i gösterin
df.show()

+---+----------+--------------------+--------+---------------+--------------------+
|  0|       ids|                date|    flag|           user|                text|
+---+----------+--------------------+--------+---------------+--------------------+
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|  9|1467812025|Mon Apr 06 22:20:...|NO_QUERY|        mimismo|@twittera que 

In [89]:
# PySpark DataFrame'deki toplam satır sayısını alın
row_count = df.count()

# Sonucu yazdırın
print("Toplam Satır Sayısı:", row_count)


[Stage 65:>                                                         (0 + 2) / 2]

Toplam Satır Sayısı: 1599999


                                                                                

In [85]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


In [88]:
# "date" sütununu alın
date_column = df.select("date")

# Sonucu gösterin
date_column.show(5, truncate=False)


+----------+
|date      |
+----------+
|07/04/2009|
|07/04/2009|
|07/04/2009|
|07/04/2009|
|07/04/2009|
+----------+
only showing top 5 rows



In [92]:
# Veri çerçevesinin sütun veri tiplerini alın
column_data_types = df.dtypes

# Sonuçları görüntüle
for column_name, data_type in column_data_types:
    print(f"Sütun Adı: {column_name}, Veri Türü: {data_type}")


Sütun Adı: 0, Veri Türü: int
Sütun Adı: ids, Veri Türü: bigint
Sütun Adı: date, Veri Türü: string
Sütun Adı: flag, Veri Türü: string
Sütun Adı: user, Veri Türü: string
Sütun Adı: text, Veri Türü: string


In [87]:
from pyspark.sql.functions import to_date, date_format

# "date" sütununu önce datetime veri türüne dönüştür
df = df.withColumn("date", to_date(df["date"], "EEE MMM dd HH:mm:ss zzz yyyy"))

# Dönüştürülmüş tarihi "dd/MM/yyyy" formatına dönüştür
df = df.withColumn("date", date_format(df["date"], "dd/MM/yyyy"))


In [48]:
from pyspark.sql.functions import col

# Yıl, ay, gün, saat, dakika ve saniyeye göre sıralanmış bir sütun oluştur
df = df.withColumn("timestamp", col("date").cast("timestamp"))


In [52]:
from pyspark.sql.functions import window

# Verileri belirli bir zaman periyoduna göre grupla ve topla
df_grouped = df.groupBy(window("timestamp", "1 day")).count()


In [69]:
df.show()

[Stage 46:>                                                         (0 + 2) / 2]

+------+----------+-------------------+--------+---------------+--------------------+
|     0|       ids|               date|    flag|           user|                text|
+------+----------+-------------------+--------+---------------+--------------------+
|799999|2329205794|2009-06-25 18:28:31|NO_QUERY|     tpchandler|has to resit exam...|
|799997|2329205473|2009-06-25 18:28:30|NO_QUERY|       LeeLHoke|rest in peace Far...|
|799998|2329205574|2009-06-25 18:28:30|NO_QUERY|   davidlmulder|@Eric_Urbane Soun...|
|799996|2329205038|2009-06-25 18:28:28|NO_QUERY|        bigenya|     Gmail is down? |
|799994|2329204987|2009-06-25 18:28:28|NO_QUERY|      360cookie|Tried to get the ...|
|799995|2329205009|2009-06-25 18:28:28|NO_QUERY|       dandykim|Sick  Spending my...|
|799992|2329204790|2009-06-25 18:28:27|NO_QUERY|      CJROSE218|@koolgirl37 read ...|
|799993|2329204835|2009-06-25 18:28:27|NO_QUERY|        mattfca|My life  http://m...|
|799991|2329204705|2009-06-25 18:28:27|NO_QUERY|  love

