In [4]:
import sys
import os




import pandas as pd
import requests
from datetime import datetime
from bs4 import BeautifulSoup

from operations.mongodb import *


cur_date = datetime.now().strftime("%d-%m-%Y")

# Hàm lấy dữ liệu 
def getting_movie_name(crawl_date=cur_date):
    movies_name = []
    year = cur_date.split('-')[2]

    url = f"https://www.boxofficemojo.com/year/world/{year}/"
    res = requests.get(url)
    # Kiểm tra nếu xảy ra lỗi request từ web
    if res.status_code == 200:
        print(f"Connected successfully to {year}")
        soup = BeautifulSoup(res.text, 'html.parser')

        # Chỉ định bảng cụ thể để tạo điều kiện cho việc tìm kiếm các tên phim 
        table = soup.find({"table":{"class":"a-bordered a-horizontal-stripes \
                                        a-size-base a-span12 mojo-body-table mojo-table-annotated scrolling-data-table"}})
            
        # Lấy tất cả các hàng trong bảng phim
        rows = table.select('a.a-link-normal')
        for row in rows:
            if '/releasegroup/' in row.get('href', ''):
                movies_name.append({
                    "Execution Date"    :   cur_date,
                    "Name"              :   row.text.strip(),
                    "Year"              :   year
                })
        print(f"Getting Successfully {len(rows)} movies in {year}")
    else:
        raise TypeError(f"Can't find data in {year}")

    return pd.DataFrame(data    =   movies_name,
                        columns =   ['Execution Date', 'Name', 'Year'])

def loading_movies_name(crawl_date : str):
    with connect_mongodb(username='ndtien', password='ndtien',host='mongodb',port='27017') as client:

        client_op = MongoDB_Operation(client=client)

        # Khởi tạo database/collection nếu đây là lần chạy đầu tiên
        client_op.create_database(db_name="movies_db")
        client_op.create_collection(db_name="movies_db",collection_name="movies_name")

        # Lấy dữ liệu từ các ngày trước đó 
        old_movies_name = client_op.find_data(db_name="movies_db",collection_name="movies_name",
                                              query={})
        # Truy xuất dữ liệu trong ngày hôm nay
        new_movies_name = getting_movie_name(crawl_date=crawl_date)
        
        # Dữ liệu mới 
        old_movie_keys = set((doc['Name'], doc['Year']) for doc in old_movies_name)
        unique_movies = new_movies_name[
                                        ~new_movies_name.apply(lambda x: (x['Name'], x['Year']) in old_movie_keys, axis=1)
                                        ]
        
        # Nếu có dữ liệu mới thì thêm vào
        if not unique_movies.empty:
            client_op.insert_many(db_name="movies_db",collection_name="movies_name",
                                data=unique_movies)
            print(f"Have {len(unique_movies)} new movie was added in {cur_date}")
        else:
            print("Don't have new movie in this day")


import requests
import pandas as pd
import requests
from datetime import datetime

from operations.mongodb import *

cur_date = datetime.now().strftime("%d-%m-%Y")

# Hàm lấy dữ liệu
def get_movie_data(list_movies_name, crawl_date = cur_date):

    # 
    API_KEY = "2d6e1b290dabf74f65b84431677db2b8"
    BASE_URL = "https://api.themoviedb.org/3"
    END_POINT = "/search/movie"

    movies_data = []
    for name in list_movies_name:
        params = {
            'api_key': API_KEY,
            'query': name,
            'language': 'en-US',
            'page': 1
        }

        url = f"{BASE_URL}{END_POINT}"
        response = requests.get(url, params=params)

        if response.status_code == 200:
            movies_data.append({
                "Execution Date"    :   crawl_date,
                "Results"           :   response.json().get('results'),
            })
        else:
            raise TypeError(f"Can't find movie {name}")
    
    return pd.DataFrame(data= movies_data, columns=['Execction Date','Results'])

def loading_movies_data(crawl_date: str):
    with connect_mongodb(username='ndtien',password='ndtien',host='mongodb',port='27017') as client:
        client_op = MongoDB_Operation(client=client)

        # Tìm ra các tên mới trong ngày hôm nay
        new_movies_name = client_op.find_data(db_name="movies_db",collection_name="movies_name",
                                              query={"Execution Date" : crawl_date})['Name'].tolist()
        
        new_movies_data = get_movie_data(new_movies_name,crawl_date=crawl_date)

        return new_movies_data

ModuleNotFoundError: No module named 'operations'

In [2]:


from datetime import datetime

from gettingdata.getting_movies_daily import *
from gettingdata.getting_name_daily import *

from operations.sparkprocess import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, FloatType, ArrayType

tmdb_movie_schema = StructType([
    StructField("adult", BooleanType(), True),
    StructField("backdrop_path", StringType(), True),
    StructField("genre_ids", ArrayType(IntegerType()), True),
    StructField("id", IntegerType(), True),
    StructField("original_language", StringType(), True),
    StructField("original_title", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("popularity", FloatType(), True),
    StructField("poster_path", StringType(), True),
    StructField("release_date", StringType(), True),  # Could use DateType if you'll convert it
    StructField("title", StringType(), True),
    StructField("video", BooleanType(), True),
    StructField("vote_average", FloatType(), True),
    StructField("vote_count", IntegerType(), True)
])

def bronze_process():

    cur_date = datetime.now().strftime("%d-%m-%Y")
    
    with spark_session(master="spark://spark-master:7077",
                    appName="Bronze Process",
                    jars=["org.mongodb.spark:mongo-spark-connector_2.12:10.4.1"],
                    config={
                        "spark.hadoop.fs.defaultFS":"hdfs://namenode:9870",
                        "spark.sql.execution.arrow.pyspark.enabled": "true"
                        }
                    ) as spark:
        
        spark_op = Spark_Operation(spark=spark)

        # Lấy dữ liệu mới vào ngày hôm nay
        loading_movies_name(crawl_date=cur_date)
        new_movies_df = loading_movies_data(crawl_date=cur_date)

        new_movies_spark_df = spark.createDataFrame(new_movies_df, schema=tmdb_movie_schema)

        # Thêm dữ liệu vào lớp Bronze
        spark_op.write_hdfs(df=new_movies_spark_df,
                            path="hdfs://namenode:8020/movies_data/bronze_layer",
                            format="parquet",
                            mode="overwrite")
    
print("================== Bronze Process ==================")
bronze_process()
print("================== Bronze Process ==================")



ModuleNotFoundError: No module named 'gettingdata'

In [19]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, FloatType, ArrayType

schema = StructType([
    StructField("movie_results", ArrayType(StructType([
        StructField("adult", BooleanType(), True),
        StructField("backdrop_path", StringType(), True),
        StructField("id", IntegerType(), True),
        StructField("title", StringType(), True),
        StructField("original_language", StringType(), True),
        StructField("original_title", StringType(), True),
        StructField("overview", StringType(), True),
        StructField("poster_path", StringType(), True),
        StructField("media_type", StringType(), True),
        StructField("genre_ids", ArrayType(IntegerType(), True), True),
        StructField("popularity", FloatType(), True),
        StructField("release_date", StringType(), True),
        StructField("video", BooleanType(), True),
        StructField("vote_average", FloatType(), True),
        StructField("vote_count", IntegerType(), True)
    ]), True)),
    StructField("person_results", ArrayType(StructType([]), True), True),
    StructField("tv_results", ArrayType(StructType([]), True), True),
    StructField("tv_episode_results", ArrayType(StructType([]), True), True),
    StructField("tv_season_results", ArrayType(StructType([]), True), True)
])
df = spark.read.parquet("hdfs://namenode:8020/movies_data/bronze.parquet",schema=schema)

                                                                                

#### Xử lý các giá trị null

In [20]:
df.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- budget: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: long (valueContainsNull = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- origin_country: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: long (valueContainsNull

In [21]:
# Tạo DataFrame cleaned_df từ df gốc
cleaned_df = df.withColumn(
    "backdrop_path",
    when(col("backdrop_path").isNull(), "default_backdrop.jpg").otherwise(col("backdrop_path"))
).withColumn(
    "overview",
    when(col("overview").isNull(), "No overview available").otherwise(col("overview"))
).withColumn(
    "poster_path",
    when(col("poster_path").isNull(), "default_poster.jpg").otherwise(col("poster_path"))
).withColumn(
    "homepage",
    when(col("homepage").isNull(), "No homepage").otherwise(col("homepage"))
)

In [22]:
# Thay thế NULL bằng 0 cho các cột số
numeric_cols = ["budget", "popularity", "vote_count", "vote_average", "runtime"]
for col_name in numeric_cols:
    cleaned_df = cleaned_df.withColumn(
        col_name,
        when(col(col_name).isNull(), 0).otherwise(col(col_name))
    )

In [23]:
# Thay thế NULL bằng False cho các cột boolean
boolean_cols = ["adult", "video"]
for col_name in boolean_cols:
    cleaned_df = cleaned_df.withColumn(
        col_name,
        when(col(col_name).isNull(), False).otherwise(col(col_name))
    )

In [24]:
# Xử lý các cột mảng
array_cols = ["genres", "origin_country", "production_companies", "spoken_languages"]
for col_name in array_cols:
    cleaned_df = cleaned_df.withColumn(
        col_name,
        when(col(col_name).isNull(), array()).otherwise(col(col_name))
    )

In [25]:
from pyspark.sql.functions import create_map, lit

cleaned_df = cleaned_df.withColumn(
    "belongs_to_collection",
    when(col("belongs_to_collection").isNull(), 
         create_map(lit("id"), lit(None).cast("long"), lit("name"), lit(None).cast("string")))
    .otherwise(col("belongs_to_collection"))
)

In [26]:
# Xử lý release_date
cleaned_df = cleaned_df.withColumn(
    "release_date",
    when(col("release_date").isNull(), "1900-01-01").otherwise(col("release_date"))
)

# Xử lý tagline
cleaned_df = cleaned_df.withColumn(
    "tagline",
    when(col("tagline").isNull(), "No tagline").otherwise(col("tagline"))
)

# Xử lý imdb_id
cleaned_df = cleaned_df.withColumn(
    "imdb_id",
    when(col("imdb_id").isNull(), "tt0000000").otherwise(col("imdb_id"))
)

In [27]:
cleaned_df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+-----+--------------------+---------------------+---------+--------------------+--------------------+-------+----------+--------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+-----+------------+----------+
|adult|       backdrop_path|belongs_to_collection|   budget|              genres|            homepage|     id|   imdb_id|origin_country|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|video|vote_average|vote_count|
+-----+--------------------+---------------------+---------+--------------------+--------------------+-------+----------+--------------+-----------------+--------------------+---------------

                                                                                

In [28]:
cleaned_df.write.mode("overwrite").format("parquet").save("hdfs://namenode:8020/movies_data/sliver.parquet")

In [29]:
spark.stop()