<a href="https://colab.research.google.com/github/PuchToTalk/Football_market-value/blob/Airflow-dag/my_dag.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step 1 & 2 : DAG structure to load our 2 CSV

Method 1 :

In [None]:
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd



with DAG(
       'my_first_dag',
       default_args={
           'depends_on_past': False,
           'email': ['airflow@example.com'],
           'email_on_failure': False,
           'email_on_retry': False,
           'retries': 1,
           'retry_delay': timedelta(minutes=5),
       },
       description='A first DAG',
       schedule=None,
       start_date=datetime(2023, 1, 1),
       catchup=False,
       tags=['example'],

) as dag:
   dag.doc_md = """
       This is my DAG in airflow for the Big Data Project.
       I can write documentation in Markdown here with **bold text** or __bold text__.
   """

   import pandas as pd
   import pyarrow as pa
   import pyarrow.parquet as pq


   def load_foot_perf():
       print("load foot perf task")
       # Chargement du fichier foot_perf.csv
       foot_perf_data = pd.read_csv("/Users/paulc/airflow/dags/foot_perf.csv")
       #foot_perf_data = foot_perf_data.to_json(orient='records')

       # Convert DataFrame to PyArrow Table
       foot_perf_data = pa.Table.from_pandas(foot_perf_data)
       output_1 = 'output_foot_perf.parquet'
       pq.write_table(foot_perf_data, output_1)

       # Read Parquet file back into a DataFrame
       parquet_table1 = pq.read_table(output_1)
       foot_perf_data = parquet_table1.to_pandas()
       foot_perf_data = foot_perf_data.to_json(orient='records')
       return foot_perf_data

   foot_perf_data = load_foot_perf()
   #print(foot_perf_data)



   def load_market_value():
       print("load market value task")
       # Chargement du fichier market_value.csv
       market_value_data = pd.read_csv("/Users/paulc/airflow/dags/only_mv2.csv")
       #market_value_data = market_value_data.to_json(orient='records')
       # Convert DataFrame to PyArrow Table
       market_value_data = pa.Table.from_pandas(market_value_data)
       output_2 = 'output_foot_value.parquet'
       pq.write_table(market_value_data, output_2)

       # Read Parquet file back into a DataFrame
       parquet_table2 = pq.read_table(output_2)
       market_value_data = parquet_table2.to_pandas()
       market_value_data = market_value_data.to_json(orient='records')
       return market_value_data


   market_value_data = load_market_value()
   #print(market_value_data)


   def clean_foot_perf():
       print("cleaned foot perf data task")
       # Nettoyage et filtrage des données de foot_perf
       foot_perf_data = pd.read_json(load_foot_perf())
       # Read Parquet file back into a DataFrame
       output_1 = 'output_foot_perf.parquet'
       # Read Parquet file back into a DataFrame
       parquet_table1 = pq.read_table(output_1)
       foot_perf_data = parquet_table1.to_pandas()
       return foot_perf_data


   print(clean_foot_perf())


   # Perform further operations with cleaned_foot_perf_data if needed

   def clean_market_value(market_value_data):
       print("formatted market value data task")
       market_value_data = pd.DataFrame(market_value_data)
       filtered_data = market_value_data.drop(columns= filtered_data.columns[0], axis=1)
       filtered_data = filtered_data.drop(["Nation", "Pos"], axis=1)
       filtered_data = filtered_data.to_dict(orient='records')
       return filtered_data


   def join_datasets(df_football_stats, market_value_data):
       print("join 2 datasets task ")
       # Jointure des données de foot_perf et market_value
       merged_data = df_football_stats.merge(market_value_data, on='player', how='inner')
       return merged_data


   def index_to_elastic(merged_data):
       print("merged data task")
       # Indexation des données dans ElasticSearch
       # Code pour l'indexation des données dans ElasticSearch
       #pass

   source_to_raw1 = PythonOperator(
        task_id='source_to_raw1',
        python_callable=load_foot_perf,
        #provide_context=True,  # Add this line to pass the context to the function
        #op_kwargs={'foot_perf_data': load_foot_perf()}  # Initialize foot_perf_data with None
    )

   source_to_raw2 = PythonOperator(
       task_id='source_to_raw2',
       python_callable=load_market_value,
       #provide_context=True,  # Add this line to pass the context to the function
       #op_kwargs={'market_value_data': load_market_value()}  # Initialize foot_perf_data with None
   )

   raw_to_formatted1 = PythonOperator(
       task_id='raw_to_formatted1',
       python_callable=clean_foot_perf,
       #op_kwargs={'foot_perf_data': '{{ ti.xcom_pull(task_ids="source_to_raw1") }}'}

   )

   raw_to_formatted2 = PythonOperator(
        task_id='raw_to_formatted2',
        python_callable=clean_market_value,
        #op_kwargs={'market_value_data': '{{ ti.xcom_pull(task_ids="source_to_raw2") }}'}

    )

   produce_usage = PythonOperator(
       task_id='produce_usage',
       python_callable=join_datasets,

   )

   index_to_elastic = PythonOperator(
       task_id='index_to_elastic',
       python_callable=index_to_elastic,

   )

   source_to_raw1 >> raw_to_formatted1
   source_to_raw2 >> raw_to_formatted2
   [raw_to_formatted1, raw_to_formatted2] >> produce_usage >> index_to_elastic


   if __name__ == 'main':
        df = clean_foot_perf(load_foot_perf())
        df1 = clean_market_value(load_market_value())
        df2 = join_datasets(df, df1)
        index_to_elastic(df2)

Method 2 : méthode JSON -> Data -> JSON

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import DagRun
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'my_first_dag',
    default_args=default_args,
    description='A first DAG',
    schedule=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['example'],
)

def load_foot_perf():
    print("load foot perf task")
    foot_perf_data = pd.read_csv("/Users/paulc/airflow/dags/foot_perf.csv")
    print(foot_perf_data)
    foot_perf_data_parquet = pa.Table.from_pandas(foot_perf_data)
    output_1 = 'output_foot_perf.parquet'
    pq.write_table(foot_perf_data_parquet, output_1)
    foot_perf_data = foot_perf_data.to_json(orient='records')
    print(foot_perf_data)
    return foot_perf_datax

def load_market_value():
    print("load market value task")
    market_value_data = pd.read_csv("/Users/paulc/airflow/dags/only_mv2.csv")
    market_value_data_parquet = pa.Table.from_pandas(market_value_data)
    output_2 = 'output_foot_value.parquet'
    pq.write_table(market_value_data_parquet, output_2)
    market_value_data = market_value_data.to_json(orient='records')
    return market_value_data

def clean_foot_perf(ti):
    print("cleaned foot perf data task")
    foot_perf_data = ti.xcom_pull(task_ids='source_to_raw1')
    # Perform cleaning operations on foot_perf_data DataFrame
    cleaned_df_perf = pd.read_json(foot_perf_data)
    print(cleaned_df_perf)
    cleaned_df_perf_data = cleaned_df_perf.to_json(orient='records')
    return cleaned_df_perf_data


def clean_market_value(ti):
    print("formatted market value data task")
    market_value_data = ti.xcom_pull(task_ids='source_to_raw2')
    # Perform cleaning operations on market_value_data DataFrame
    cleaned_df_value = pd.read_json(market_value_data)
    print(cleaned_df_value)
    cleaned_df_value_data = cleaned_df_value.to_json(orient='records')
    return cleaned_df_value_data


def join_datasets(ti):
    print("join 2 datasets task")
    foot_perf_data = ti.xcom_pull(task_ids='raw_to_formatted1')
    market_value_data = ti.xcom_pull(task_ids='raw_to_formatted2')
    # Join foot_perf_data and market_value_data
    merged_data = foot_perf_data.merge(market_value_data, on='player', how='inner')
    return merged_data

def index_to_elastic(ti):
    print("index to ElasticSearch task")
    merged_data = ti.xcom_pull(task_ids='combine')
    # Index merged_data to ElasticSearch
    # ...

source_to_raw1 = PythonOperator(
    task_id='source_to_raw1',
    python_callable=load_foot_perf,
    dag=dag,
)

source_to_raw2 = PythonOperator(
    task_id='source_to_raw2',
    python_callable=load_market_value,
    dag=dag,
)

raw_to_formatted1 = PythonOperator(
    task_id='raw_to_formatted1',
    python_callable=clean_foot_perf,

    dag=dag,
)

raw_to_formatted2 = PythonOperator(
    task_id='raw_to_formatted2',
    python_callable=clean_market_value,

    dag=dag,
)

combine = PythonOperator(
    task_id='combine',
    python_callable=join_datasets,

    dag=dag,
)

index_to_elastic = PythonOperator(
    task_id='index_to_elastic',
    python_callable=index_to_elastic,

    dag=dag,
)

source_to_raw1 >> raw_to_formatted1
source_to_raw2 >> raw_to_formatted2
[raw_to_formatted1, raw_to_formatted2] >> combine >> index_to_elastic


Step 3 : ajout du merge

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import DagRun
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from elasticsearch import Elasticsearch, helpers


default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'my_first_dag',
    default_args=default_args,
    description='A first DAG',
    schedule=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['example'],
)

def load_foot_perf():
    print("load foot perf task")
    foot_perf_data = pd.read_csv("/Users/paulc/airflow/dags/foot_perf.csv")
    print(foot_perf_data)
    foot_perf_data_parquet = pa.Table.from_pandas(foot_perf_data)
    output_1 = 'output_foot_perf.parquet'
    pq.write_table(foot_perf_data_parquet, output_1)
    foot_perf_data = foot_perf_data.to_json(orient='records')
    print(foot_perf_data)
    return foot_perf_data

def load_market_value():
    print("load market value task")
    market_value_data = pd.read_csv("/Users/paulc/airflow/dags/only_mv2.csv")
    market_value_data_parquet = pa.Table.from_pandas(market_value_data)
    output_2 = 'output_foot_value.parquet'
    pq.write_table(market_value_data_parquet, output_2)
    market_value_data = market_value_data.to_json(orient='records')
    return market_value_data

def clean_foot_perf(ti):
    print("cleaned foot perf data task")
    foot_perf_data = ti.xcom_pull(task_ids='source_to_raw1')
    # Perform cleaning operations on foot_perf_data DataFrame
    cleaned_df_perf = pd.read_json(foot_perf_data)
    print(cleaned_df_perf)
    cleaned_df_perf_data = cleaned_df_perf.to_json(orient='records')
    return cleaned_df_perf_data


def clean_market_value(ti):
    print("formatted market value data task")
    market_value_data = ti.xcom_pull(task_ids='source_to_raw2')
    # Perform cleaning operations on market_value_data DataFrame
    cleaned_df_value = pd.read_json(market_value_data)
    print(cleaned_df_value)
    cleaned_df_value_data = cleaned_df_value.to_json(orient='records')
    return cleaned_df_value_data


def join_datasets(ti):
    print("join 2 datasets task")
    cleaned_df_perf = ti.xcom_pull(task_ids='raw_to_formatted1')
    cleaned_df_perf = pd.read_json(cleaned_df_perf)
    cleaned_df_value = ti.xcom_pull(task_ids='raw_to_formatted2')
    cleaned_df_value = pd.read_json(cleaned_df_value)
    print(cleaned_df_value)
    print(cleaned_df_perf)

    # Join foot_perf_data and market_value_data
    merged_data = cleaned_df_perf.merge(cleaned_df_value, on='Player', how='inner')
    print(merged_data)
    merged_data = merged_data.to_json(orient='records')
    return merged_data


def index_to_elastic(ti):
    print("Index to Elasticsearch task")
    merged_data = ti.xcom_pull(task_ids='combine')
    merged_data_to_elastic = pd.read_json(merged_data)

    # Create an Elasticsearch instance
    es = Elasticsearch(['http://localhost:9200'])  # Modify the URL if necessary

    # Index the data in Elasticsearch
    index_name = 'equipe_nation_index'  # Elasticsearch index name
    doc_type = 'equipe'  # Elasticsearch document type (deprecated starting from version 7.x)

    actions = []
    for index, row in merged_data_to_elastic.iterrows():
        document = {
            "_index": index_name,
            "_type": doc_type,
            "_source": row.to_dict()
        }
        actions.append(document)

    # Use the "helpers.bulk" method to bulk index the documents
    response = helpers.bulk(es, actions)

    # Check for indexing errors
    if response[1]:
        print("Error indexing data to Elasticsearch.")
        print(response[1])
        raise BulkIndexError("%i document(s) failed to index." % len(response[1]), response[1])
    else:
        print("Data has been successfully indexed to Elasticsearch.")

source_to_raw1 = PythonOperator(
    task_id='source_to_raw1',
    python_callable=load_foot_perf,
    dag=dag,
)

source_to_raw2 = PythonOperator(
    task_id='source_to_raw2',
    python_callable=load_market_value,
    dag=dag,
)

raw_to_formatted1 = PythonOperator(
    task_id='raw_to_formatted1',
    python_callable=clean_foot_perf,

    dag=dag,
)

raw_to_formatted2 = PythonOperator(
    task_id='raw_to_formatted2',
    python_callable=clean_market_value,

    dag=dag,
)

combine = PythonOperator(
    task_id='combine',
    python_callable=join_datasets,

    dag=dag,
)

index_to_elastic = PythonOperator(
    task_id='index_to_elastic',
    python_callable=index_to_elastic,

    dag=dag,
)

source_to_raw1 >> raw_to_formatted1
source_to_raw2 >> raw_to_formatted2
[raw_to_formatted1, raw_to_formatted2] >> combine >> index_to_elastic


Step 4 : Indexing qui marche

In [None]:
# Create an Elasticsearch instance
es_client = connections.create_connection(hosts=['http://localhost:9200/'])

test_df = pd.read_csv("foot_perf.csv")

def indexing_function(df):

    # Index the data in Elasticsearch
    df_iter = df.iterrows()
    for index, document in df_iter:
        yield {
            "_index": 'equipe_nation_index',
            "_type": "_doc",
            "_source": document,
        }

helpers.bulk(es_client, indexing_function(test_df))

------

# Final DAG

my_dag.py

In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import connections

# Import all other Python Files' functions
from index_elastic import indexing_function
from formatting_functions import interval_value, interval_age, interval_dribble, interval_goal, interval_match, interval_pass
from team_by_nation import filtering_players_by_nation




default_args = {
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'my_first_dag',
    default_args=default_args,
    description='A first DAG',
    schedule=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['example'],
)

def load_foot_perf():
    print("Step 1.1 : Load foot perf task")
    foot_perf_data = pd.read_csv("/Users/paulc/airflow/dags/CSV/foot_perf.csv")
    print(f"Our first CSV file raw data : {foot_perf_data}")

    # Creation of the parquet file
    foot_perf_data_parquet = pa.Table.from_pandas(foot_perf_data)
    output_1 = '/Users/paulc/airflow/dags/Parquet/load_foot_perf.parquet'
    pq.write_table(foot_perf_data_parquet, output_1)

    # Convert the CSV file to JSON file (because Airflow does not allow Dataframe file type)
    foot_perf_data = foot_perf_data.to_json(orient='records')

    print(f"The JSON file : {foot_perf_data}")
    return foot_perf_data

#foot_perf_data = load_foot_perf()


def load_market_value():
    print("Step 1.2 : Load market value task")
    market_value_data = pd.read_csv("/Users/paulc/airflow/dags/CSV/only_mv2.csv")
    print(f"Our second CSV file raw data : {market_value_data}")
    # Creation of the parquet file
    market_value_data_parquet = pa.Table.from_pandas(market_value_data)
    output_2 = '/Users/paulc/airflow/dags/Parquet/load_market_value.parquet'
    pq.write_table(market_value_data_parquet, output_2)

    # Convert the CSV file to JSON file (for the same reason)
    market_value_data = market_value_data.to_json(orient='records')
    print(f"The JSON file : {market_value_data}")
    return market_value_data

#market_value_data = load_market_value()






def clean_foot_perf(ti):
    print("cleaned foot perf data task")
    foot_perf_data = ti.xcom_pull(task_ids='source_to_raw1')
    foot_perf_data = pd.read_json(foot_perf_data)
    # Formatting our data by filtering on a new dataframe based on the original one
    df_football_stats = pd.DataFrame()
    df_football_stats['Player'] = foot_perf_data['Player']
    df_football_stats['Nation'] = foot_perf_data['Nation']
    df_football_stats['Pos'] = foot_perf_data['Pos']
    df_football_stats['Squad'] = foot_perf_data['Squad']
    df_football_stats['Comp'] = foot_perf_data['Comp']
    df_football_stats['Age'] = foot_perf_data['Age']
    df_football_stats['MP'] = foot_perf_data['MP']
    df_football_stats['G/90'] = foot_perf_data['Goals']
    df_football_stats['G/Sh'] = foot_perf_data['G/Sh']
    df_football_stats['PKGoals'] = ((foot_perf_data['ShoPK'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['PKAttempted'] = ((foot_perf_data['PKatt'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Goals'] = ((foot_perf_data['Goals'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Pass'] = ((foot_perf_data['PasTotAtt'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['PassCompleted'] = ((foot_perf_data['PasTotCmp'] * foot_perf_data['Min']) / 90).round(0).astype(
        int)
    df_football_stats['PassComp%'] = ((df_football_stats['PassCompleted'] / df_football_stats['Pass']) * 100).round(2)
    df_football_stats['Assist'] = ((foot_perf_data['Assists'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Cross'] = ((foot_perf_data['PasCrs'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['CrossCompleted'] = ((foot_perf_data['CrsPA'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['CrossComp%'] = ((df_football_stats['CrossCompleted'] / df_football_stats['Cross']) * 100).round(
        2)
    df_football_stats['Tackle_Won'] = ((foot_perf_data['TklWon'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['SucDribble'] = ((foot_perf_data['DriSucc'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Dribble'] = ((foot_perf_data['DriAtt'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['DribbleComp%'] = ((df_football_stats['SucDribble'] / df_football_stats['Dribble']) * 100).round(
        2)
    df_football_stats['YCards'] = ((foot_perf_data['CrdY'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['RCards'] = ((foot_perf_data['CrdR'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Fls'] = ((foot_perf_data['Fls'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Fld'] = ((foot_perf_data['Fld'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['OGoals'] = ((foot_perf_data['OG'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['AerWon'] = ((foot_perf_data['AerWon'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['AerLost'] = ((foot_perf_data['AerLost'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['AerWon%'] = (
                (foot_perf_data['AerWon'] / (foot_perf_data['AerWon'] + foot_perf_data['AerLost'])) * 100).round(2)
    df_football_stats = df_football_stats.iloc[:, [0, 1, 2, 3, 4, 5, 6, 11, 12, 14, 15, 19, 22]]
    df_football_stats = df_football_stats.fillna(0)

    # Create a folder named "Formatted_result" if it doesn't exist
    folder_path = "Formatted_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Formatted_result" folder
    file_path = os.path.join(folder_path, "formatted_football_stats.csv")
    df_football_stats.to_csv(file_path, index=False)


    # Print the new Dataframe Result
    print(df_football_stats)
    cleaned_foot_perf_data = df_football_stats.to_json(orient='records')
    return cleaned_foot_perf_data

#cleaned_df_perf_data = clean_foot_perf(foot_perf_data)






def clean_market_value(ti):
    print("formatted market value data task")
    market_value_data = ti.xcom_pull(task_ids='source_to_raw2')
    market_value_data = pd.read_json(market_value_data)

    filtered_data = market_value_data.drop(columns=market_value_data.columns[0],
                                           axis=1)  # Delete the 1st column which was "Unnamed"

    filtered_data['market_int'] = filtered_data['Market value'].str.replace('m', '').str.replace("€",
                                                                                                 "")  # Convert the column into a Float type
    filtered_data['market_int'] = filtered_data['market_int'].astype(float)

    filtered_data = filtered_data.drop(["Nation", "Pos", "Market value"], axis=1)  # We delete useless columns

    # Create a folder named "Formatted_result" if it doesn't exist
    folder_path = "Formatted_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Formatted_result" folder
    file_path = os.path.join(folder_path, "filtered_market_value.csv")
    filtered_data.to_csv(file_path, index=False)
    print(filtered_data)
    clean_market_value_data = filtered_data.to_json(orient='records')


    return clean_market_value_data

#cleaned_df_value_data = clean_market_value(market_value_data)



def join_datasets(ti):
    # We load our 2 formatted Datasets
    print("join 2 datasets task")
    cleaned_df_perf = ti.xcom_pull(task_ids='raw_to_formatted1')
    cleaned_df_perf = pd.read_json(cleaned_df_perf)
    cleaned_df_value = ti.xcom_pull(task_ids='raw_to_formatted2')
    cleaned_df_value = pd.read_json(cleaned_df_value)
    print(cleaned_df_value)
    print(cleaned_df_perf)

    # After loading them, we join foot_perf_data and market_value_data into a single df : merged_df
    merged_df = pd.merge(cleaned_df_perf, cleaned_df_value, on='Player', how='left')
    median_value = merged_df['market_int'].median()
    merged_df['market_int'] = merged_df['market_int'].fillna(round(median_value / 2, 2))
    merged_df['Players Age interval'] = merged_df['Age'].apply(interval_age)
    merged_df['Match played interval'] = merged_df['MP'].apply(interval_match)
    merged_df['Goals interval'] = merged_df['Goals'].apply(interval_goal)
    merged_df['Successful Pass rate (%)'] = merged_df['PassComp%'].apply(interval_pass)
    merged_df['Successful Dribble rate (%)'] = merged_df['DribbleComp%'].apply(interval_dribble)
    merged_df['market_interval (M €)'] = merged_df['market_int'].apply(interval_value)
    merged_df = merged_df.rename(columns={'market_int': 'MarketValue (M €)'})
    merged_df = merged_df.drop_duplicates(["Player"], keep="first")

    # Create a folder named "Combined_result" if it doesn't exist
    folder_path = "Combined_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Combined_result" folder
    file_path = os.path.join(folder_path, "merged_data.csv")
    merged_df.to_csv(file_path, index=False)
    print(merged_df)

    merged_data = merged_df.to_json(orient='records')
    return merged_data

#merged_data = join_datasets(ti)



def nationalteam_selection(ti):
    players_data = ti.xcom_pull(task_ids='combine')
    players_data = pd.read_json(players_data)
    players_by_nation_data = filtering_players_by_nation(players_data)
    print(players_by_nation_data.head(34))

    folder_path = "International_Team_Selection"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "International_Team_Selection" folder
    file_path = os.path.join(folder_path, "players_by_nation_data.csv")
    players_by_nation_data.to_csv(file_path, index=False)

    players_by_nation_data = players_by_nation_data.to_json(orient='records')
    return players_by_nation_data





def index_to_elastic(ti):
    final_data_save = ti.xcom_pull(task_ids='players_nation')
    final_data_elastic = pd.read_json(final_data_save)
    es_client = connections.create_connection(hosts=['http://localhost:9200/'])
    helpers.bulk(es_client, indexing_function(final_data_elastic))














source_to_raw1 = PythonOperator(
    task_id='source_to_raw1',
    python_callable=load_foot_perf,
    dag=dag,
)

source_to_raw2 = PythonOperator(
    task_id='source_to_raw2',
    python_callable=load_market_value,
    dag=dag,
)

raw_to_formatted1 = PythonOperator(
    task_id='raw_to_formatted1',
    python_callable=clean_foot_perf,

    dag=dag,
)

raw_to_formatted2 = PythonOperator(
    task_id='raw_to_formatted2',
    python_callable=clean_market_value,

    dag=dag,
)






combine = PythonOperator(
    task_id='combine',
    python_callable=join_datasets,

    dag=dag,
)

filter_players_nation = PythonOperator(
    task_id='players_nation',
    python_callable=nationalteam_selection,

    dag=dag,
)



index_to_elastic = PythonOperator(
    task_id='index_to_elastic',
    python_callable=index_to_elastic,

    dag=dag,
)



source_to_raw1 >> raw_to_formatted1
source_to_raw2 >> raw_to_formatted2
[raw_to_formatted1, raw_to_formatted2] >> combine >> filter_players_nation >> index_to_elastic


index_elastic.py

In [None]:
# Create an Elasticsearch instance


def indexing_function(df):
    # Index the data in Elasticsearch
    df_iter = df.iterrows()
    for index, document in df_iter:
        yield {
            "_index": 'test10juin',
            "_type": "_doc",
            "_source": document,
        }

formatting_functions.py

In [None]:
# Create all useful functions which aims to help to manipulate our data

def interval_age(age_int):
    if (age_int < 18):
        return "[- de 18]"  # catégorie de moins de 15 M €
    elif (age_int > 18) & (age_int < 22):
        return "[18, 22["
    elif (age_int >= 22) & (age_int < 24):
        return "[22, 24["
    elif (age_int >= 24) & (age_int < 25):
        return "[24, 25["
    elif (age_int >= 25) & (age_int < 28):
        return "[25, 28["
    elif (age_int >= 28) & (age_int < 30):
        return "[28, 30["
    elif (age_int >= 30) & (age_int < 35):
        return "[30, 35["
    elif (age_int >= 35):
        return "[35 ou +]"
    else:
        return ""


# à ajouter dans un autre fichier python
def interval_match(match_int):
    if (match_int < 1):
        return "[- de 1]"  # catégorie de moins de 15 M €
    elif (match_int >= 1) & (match_int < 5):
        return "[1, 5["
    elif (match_int >= 5) & (match_int < 10):
        return "[5, 10["
    elif (match_int >= 10) & (match_int < 15):
        return "[10, 15["
    elif (match_int >= 15) & (match_int < 20):
        return "[15, 20["
    elif (match_int >= 20) & (match_int < 25):
        return "[20, 25["
    elif (match_int >= 25):
        return "[25 ou +]"
    else:
        return ""


# à ajouter dans un autre fichier python
def interval_goal(match_int):
    if (match_int < 10):
        return "[- de 10]"  # catégorie de moins de 15 M €
    elif (match_int > 10) & (match_int < 20):
        return "[10, 20["
    elif (match_int >= 20) & (match_int < 25):
        return "[20, 25["
    elif (match_int >= 25) & (match_int < 30):
        return "[25, 30["
    elif (match_int >= 30) & (match_int < 35):
        return "[30, 35["
    elif (match_int >= 35):
        return "[35 ou +]"
    else:
        return ""


# à ajouter dans un autre fichier python
def interval_pass(PassComp):
    if (PassComp < 15):
        return "[- de 15]"  # catégorie de moins de 15 M €
    elif (PassComp > 15) & (PassComp < 30):
        return "[15, 30["
    elif (PassComp >= 30) & (PassComp < 50):
        return "[30, 50["
    elif (PassComp >= 50) & (PassComp < 60):
        return "[50, 60["
    elif (PassComp >= 60) & (PassComp < 80):
        return "[60, 80["
    elif (PassComp >= 80) & (PassComp < 90):
        return "[80, 90["
    elif (PassComp >= 90) & (PassComp < 100):
        return "[90, 100]"
    elif (PassComp == 100):
        return "[100]"
    else:
        return ""


# à ajouter dans un autre fichier python
def interval_dribble(DribbleComp):
    if (DribbleComp < 15):
        return "[- de 15]"  # catégorie de moins de 15 M €
    elif (DribbleComp > 15) & (DribbleComp < 30):
        return "[15, 30["
    elif (DribbleComp >= 30) & (DribbleComp < 50):
        return "[30, 50["
    elif (DribbleComp >= 50) & (DribbleComp < 60):
        return "[50, 60["
    elif (DribbleComp >= 60) & (DribbleComp < 80):
        return "[60, 80["
    elif (DribbleComp >= 80) & (DribbleComp < 90):
        return "[80, 90["
    elif (DribbleComp >= 90) & (DribbleComp < 100):
        return "[90, 100]"
    elif (DribbleComp == 100):
        return "[100]"
    else:
        return ""


# à ajouter dans un autre fichier python
def interval_value(market_int):
    if (market_int < 15):
        return "[- de 15]"  # catégorie de moins de 15 M €
    elif (market_int > 15) & (market_int < 20):
        return "[15, 20["
    elif (market_int >= 20) & (market_int < 30):
        return "[20, 30["
    elif (market_int >= 30) & (market_int < 40):
        return "[30, 40["
    elif (market_int >= 40) & (market_int < 50):
        return "[40, 50["
    elif (market_int >= 50) & (market_int < 60):
        return "[50, 60["
    elif (market_int >= 60):
        return "[60 ou +]"
    else:
        return ""


team_by_nation.py

In [None]:

import os
import pandas as pd


def filtering_players_by_nation(merged_data):
    print("merged data task")
    df_football_stat = merged_data.sort_values(by='MarketValue (M €)', ascending=False)
    filtered_df = df_football_stat.groupby("Nation").filter(lambda x: len(x) >= 11).groupby("Nation").apply(
        lambda x: x[
            (
                (x['Goals'] >= 10) |
                (x['Assist'] >= 10) |
                (x['MP'] >= 20) |
                (x['Tackle_Won'] >= 20) |
                (x['PassComp%'] >= 60) |
                (x['DribbleComp%'] >= 60) |
                (x['MarketValue (M €)'] >= 15)
            ) & (x['Pos'] != "GK")
        ].head(10)
    ).reset_index(drop=True)
    df_football_stat = merged_data.sort_values(by='MarketValue (M €)', ascending=False)
    gk_df = df_football_stat.groupby("Nation").filter(lambda x: len(x) >= 11).groupby("Nation").apply(
        lambda x: x[
            (
                (x['Pos'] == 'GK') &
                (x['MP'] >= 10) |
                (x['Pos'] == 'GK') &
                (x['MarketValue (M €)'] >= 15)
            )
        ].head(1)
    )
    filtered_df = pd.concat([filtered_df, gk_df]).reset_index(drop=True)
    filtered_df = filtered_df.sort_values(by='Nation')  # Sort by 'Nation' column
    filtered_df.reset_index(drop=True, inplace=True)  # Reset the index

    # Create a folder named "Final_file_to_index" if it doesn't exist
    folder_path = "Final_file_to_index"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Final_file_to_index" folder
    file_path = os.path.join(folder_path, "equipe_nation.csv")
    filtered_df.to_csv(file_path, index=False)

    return filtered_df


Convert python -> PySpark

In [None]:
def clean_foot_perf(ti):
    print("cleaned foot perf data task")
    foot_perf_data = ti.xcom_pull(task_ids='source_to_raw1')
    foot_perf_data = pd.read_json(foot_perf_data)
    # Formatting our data by filtering on a new dataframe based on the original one
    df_football_stats = pd.DataFrame()
    df_football_stats['Player'] = foot_perf_data['Player']
    df_football_stats['Nation'] = foot_perf_data['Nation']
    df_football_stats['Pos'] = foot_perf_data['Pos']
    df_football_stats['Squad'] = foot_perf_data['Squad']
    df_football_stats['Comp'] = foot_perf_data['Comp']
    df_football_stats['Age'] = foot_perf_data['Age']
    df_football_stats['MP'] = foot_perf_data['MP']
    df_football_stats['G/90'] = foot_perf_data['Goals']
    df_football_stats['G/Sh'] = foot_perf_data['G/Sh']
    df_football_stats['PKGoals'] = ((foot_perf_data['ShoPK'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['PKAttempted'] = ((foot_perf_data['PKatt'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Goals'] = ((foot_perf_data['Goals'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Pass'] = ((foot_perf_data['PasTotAtt'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['PassCompleted'] = ((foot_perf_data['PasTotCmp'] * foot_perf_data['Min']) / 90).round(0).astype(
        int)
    df_football_stats['PassComp%'] = ((df_football_stats['PassCompleted'] / df_football_stats['Pass']) * 100).round(2)
    df_football_stats['Assist'] = ((foot_perf_data['Assists'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Cross'] = ((foot_perf_data['PasCrs'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['CrossCompleted'] = ((foot_perf_data['CrsPA'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['CrossComp%'] = ((df_football_stats['CrossCompleted'] / df_football_stats['Cross']) * 100).round(
        2)
    df_football_stats['Tackle_Won'] = ((foot_perf_data['TklWon'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['SucDribble'] = ((foot_perf_data['DriSucc'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Dribble'] = ((foot_perf_data['DriAtt'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['DribbleComp%'] = ((df_football_stats['SucDribble'] / df_football_stats['Dribble']) * 100).round(
        2)
    df_football_stats['YCards'] = ((foot_perf_data['CrdY'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['RCards'] = ((foot_perf_data['CrdR'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Fls'] = ((foot_perf_data['Fls'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Fld'] = ((foot_perf_data['Fld'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['OGoals'] = ((foot_perf_data['OG'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['AerWon'] = ((foot_perf_data['AerWon'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['AerLost'] = ((foot_perf_data['AerLost'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['AerWon%'] = (
                (foot_perf_data['AerWon'] / (foot_perf_data['AerWon'] + foot_perf_data['AerLost'])) * 100).round(2)
    df_football_stats = df_football_stats.iloc[:, [0, 1, 2, 3, 4, 5, 6, 11, 12, 14, 15, 19, 22]]
    df_football_stats = df_football_stats.fillna(0)

    # Create a folder named "Formatted_result" if it doesn't exist
    folder_path = "Formatted_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Formatted_result" folder
    file_path = os.path.join(folder_path, "formatted_football_stats.csv")
    df_football_stats.to_csv(file_path, index=False)


    # Print the new Dataframe Result
    print(df_football_stats)
    cleaned_foot_perf_data = df_football_stats.to_json(orient='records')
    return cleaned_foot_perf_data

---------

# Méthode en Python (pandas)

In [None]:
def clean_foot_perf(ti):
    print("cleaned foot perf data task")
    foot_perf_data = ti.xcom_pull(task_ids='source_to_raw1')
    foot_perf_data = pd.read_json(foot_perf_data)
    # Formatting our data by filtering on a new dataframe based on the original one
    df_football_stats = pd.DataFrame()
    df_football_stats['Player'] = foot_perf_data['Player']
    df_football_stats['Nation'] = foot_perf_data['Nation']
    df_football_stats['Pos'] = foot_perf_data['Pos']
    df_football_stats['Squad'] = foot_perf_data['Squad']
    df_football_stats['Comp'] = foot_perf_data['Comp']
    df_football_stats['Age'] = foot_perf_data['Age']
    df_football_stats['MP'] = foot_perf_data['MP']

    df_football_stats['G/90'] = foot_perf_data['Goals']
    df_football_stats['G/Sh'] = foot_perf_data['G/Sh']
    df_football_stats['Goals'] = ((foot_perf_data['Goals'] * foot_perf_data['Min']) / 90).round(0).astype(int)

    df_football_stats['Pass'] = ((foot_perf_data['PasTotAtt'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['PassCompleted'] = ((foot_perf_data['PasTotCmp'] * foot_perf_data['Min']) / 90).round(0).astype(
        int)
    df_football_stats['PassComp%'] = ((df_football_stats['PassCompleted'] / df_football_stats['Pass']) * 100).round(2)
    df_football_stats['Assist'] = ((foot_perf_data['Assists'] * foot_perf_data['Min']) / 90).round(0).astype(int)

    df_football_stats['Tackle_Won'] = ((foot_perf_data['TklWon'] * foot_perf_data['Min']) / 90).round(0).astype(int)

    df_football_stats['SucDribble'] = ((foot_perf_data['DriSucc'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['Dribble'] = ((foot_perf_data['DriAtt'] * foot_perf_data['Min']) / 90).round(0).astype(int)
    df_football_stats['DribbleComp%'] = ((df_football_stats['SucDribble'] / df_football_stats['Dribble']) * 100).round(
        2)
    df_football_stats = df_football_stats.iloc[:, [0, 1, 2, 3, 4, 5, 6, 11]]
    df_football_stats = df_football_stats.fillna(0)

    # Create a folder named "Formatted_result" if it doesn't exist
    folder_path = "Formatted_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Formatted_result" folder
    file_path = os.path.join(folder_path, "formatted_football_stats.csv")
    df_football_stats.to_csv(file_path, index=False)


    # Print the new Dataframe Result
    print(df_football_stats)
    cleaned_foot_perf_data = df_football_stats.to_json(orient='records')
    return cleaned_foot_perf_data




def clean_market_value(ti):
    print("formatted market value data task")
    market_value_data = ti.xcom_pull(task_ids='source_to_raw2')
    market_value_data = pd.read_json(market_value_data)

    filtered_data = market_value_data.drop(columns=market_value_data.columns[0],
                                           axis=1)  # Delete the 1st column which was "Unnamed"

    filtered_data['market_int'] = filtered_data['Market value'].str.replace('m', '').str.replace("€",
                                                                                                 "")  # Convert the column into a Float type
    filtered_data['market_int'] = filtered_data['market_int'].astype(float)

    filtered_data = filtered_data.drop(["Nation", "Pos", "Market value"], axis=1)  # We delete useless columns

    # Create a folder named "Formatted_result" if it doesn't exist
    folder_path = "Formatted_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Formatted_result" folder
    file_path = os.path.join(folder_path, "filtered_market_value.csv")
    filtered_data.to_csv(file_path, index=False)
    print(filtered_data)
    clean_market_value_data = filtered_data.to_json(orient='records')


    return clean_market_value_data

#cleaned_df_value_data = clean_market_value(market_value_data)





def join_datasets(ti):
    # We load our 2 formatted Datasets
    print("join 2 datasets task")
    cleaned_df_perf = ti.xcom_pull(task_ids='raw_to_formatted1')
    cleaned_df_perf = pd.read_json(cleaned_df_perf)
    cleaned_df_value = ti.xcom_pull(task_ids='raw_to_formatted2')
    cleaned_df_value = pd.read_json(cleaned_df_value)
    print(cleaned_df_value)
    print(cleaned_df_perf)

    # After loading them, we join foot_perf_data and market_value_data into a single df : merged_df
    merged_df = pd.merge(cleaned_df_perf, cleaned_df_value, on='Player', how='left')
    median_value = merged_df['market_int'].median()
    merged_df['market_int'] = merged_df['market_int'].fillna(round(median_value / 2, 2))
    merged_df['Players Age interval'] = merged_df['Age'].apply(interval_age)
    merged_df['Match played interval'] = merged_df['MP'].apply(interval_match)
    merged_df['Goals interval'] = merged_df['Goals'].apply(interval_goal)
    merged_df['Successful Pass rate (%)'] = merged_df['PassComp%'].apply(interval_pass)
    merged_df['Successful Dribble rate (%)'] = merged_df['DribbleComp%'].apply(interval_dribble)
    merged_df['market_interval (M €)'] = merged_df['market_int'].apply(interval_value)
    merged_df = merged_df.rename(columns={'market_int': 'MarketValue (M €)'})
    merged_df = merged_df.drop_duplicates(["Player"], keep="first")

    # Create a folder named "Combined_result" if it doesn't exist
    folder_path = "Combined_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Combined_result" folder
    file_path = os.path.join(folder_path, "merged_data.csv")
    merged_df.to_csv(file_path, index=False)
    print(merged_df)

    merged_data = merged_df.to_json(orient='records')
    return merged_data

#merged_data = join_datasets(ti)


-------

# Méthode PySpark

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace, col, round

def clean_foot_perf(df):
    print("cleaned foot perf data task")
    foot_perf_data = df
    
    # Create SparkSession if it doesn't exist
    spark = SparkSession.builder.getOrCreate()

    # Convert Pandas DataFrame to PySpark DataFrame
    df_foot_perf_data = spark.createDataFrame(foot_perf_data)

    print(df_foot_perf_data)

    # Generate the schema based on DataFrame columns
    schema = df_foot_perf_data.schema

    # Apply the schema to the DataFrame
    df_foot_perf_data = spark.createDataFrame(foot_perf_data, schema)

    # Continue with the remaining code...

    # Print the new DataFrame Result
    df_foot_perf_data.show()

    # Add the additional edits
    df_football_stats = df_foot_perf_data.withColumn('Player', df_foot_perf_data['Player']) \
        .withColumn('Nation', df_foot_perf_data['Nation']) \
        .withColumn('Pos', df_foot_perf_data['Pos']) \
        .withColumn('Squad', df_foot_perf_data['Squad']) \
        .withColumn('Comp', df_foot_perf_data['Comp']) \
        .withColumn('Age', df_foot_perf_data['Age']) \
        .withColumn('MP', df_foot_perf_data['MP']) \
        .withColumn('G/90', df_foot_perf_data['Goals']) \
        .withColumn('G/Sh', df_foot_perf_data['G/Sh']) \
        .withColumn('Goals', round((df_foot_perf_data['Goals'] * df_foot_perf_data['Min']) / 90).cast('integer')) \
        .withColumn('Pass', round((df_foot_perf_data['PasTotAtt'] * df_foot_perf_data['Min']) / 90).cast('integer')) \
        .withColumn('PassCompleted', round((df_foot_perf_data['PasTotCmp'] * df_foot_perf_data['Min']) / 90).cast('integer')) \
        .withColumn('PassComp%', round((df_foot_perf_data['PasTotCmp'] / df_foot_perf_data['PasTotAtt']) * 100, 2)) \
        .withColumn('Assist', round((df_foot_perf_data['Assists'] * df_foot_perf_data['Min']) / 90).cast('integer')) \
        .withColumn('Tackle_Won', round((df_foot_perf_data['TklWon'] * df_foot_perf_data['Min']) / 90).cast('integer')) \
        .withColumn('SucDribble', round((df_foot_perf_data['DriSucc'] * df_foot_perf_data['Min']) / 90).cast('integer')) \
        .withColumn('Dribble', round((df_foot_perf_data['DriAtt'] * df_foot_perf_data['Min']) / 90).cast('integer')) \
        .withColumn('DribbleComp%', round((df_foot_perf_data['DriSucc'] / df_foot_perf_data['DriAtt']) * 100, 2)) \

    # Select the desired columns
    df_football_stats = df_football_stats.select('Player', 'Nation', 'Pos', 'Squad', 'Comp', 'Age', 'MP', 'G/90', 'G/Sh', 'Goals', 'Pass', 'PassCompleted',
                                                 'PassComp%', 'Assist',
                                                 'Tackle_Won', 'SucDribble', 'Dribble', 'DribbleComp%')
    # Fill missing values with 0
    df_football_stats = df_football_stats.na.fill(0)

    # Save the DataFrame as a CSV file inside the "Formatted_result" folder
    folder_path = "Formatted_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Formatted_result" folder
    file_path = os.path.join(folder_path, "formatted_football_stats.csv")
    df_football_stats.toPandas().to_csv(file_path, index=False)

    # Print the new DataFrame Result
    df_football_stats.show(truncate=False)

    # Convert the DataFrame to JSON format
    #cleaned_foot_perf_data = df_football_stats.toJSON().collect()
    cleaned_foot_perf_data = df_football_stats.toPandas()
    cleaned_foot_perf_data = cleaned_foot_perf_data.to_json(orient='records')

    return cleaned_foot_perf_data


In [None]:
perf = clean_foot_perf(load_foot_perf())
perf

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import FloatType

def clean_market_value(df):
    print("formatted market value data task")

    # Create SparkSession if it doesn't exist
    spark = SparkSession.builder.getOrCreate()

    # Convert Pandas DataFrame to PySpark DataFrame
    df_market_value_data = spark.createDataFrame(df)

    # Drop the first column
    df_market_value_data = df_market_value_data.drop(df_market_value_data.columns[0])

    # Apply transformations to the columns
    df_market_value_data = df_market_value_data.withColumn("market_int", regexp_replace(col("Market value"), "m|€", "").cast(FloatType()))
    df_market_value_data = df_market_value_data.drop("Nation", "Pos", "Market value")

    df_market_value_data.show(truncate=False)

    # Create a folder named "Formatted_result" if it doesn't exist
    folder_path = "Formatted_result"
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Formatted_result" folder
    file_path = os.path.join(folder_path, "filtered_market_value.csv")
    df_market_value_data.toPandas().to_csv(file_path, index=False)

    # Convert the DataFrame to JSON format
    #clean_market_value_data = df_market_value_data.toJSON().collect()
    df_market_value_data = df_market_value_data.toPandas()
    clean_market_value_data = df_market_value_data.to_json(orient='records')

    return clean_market_value_data


In [None]:
value = clean_market_value(load_market_value())
value

In [None]:
from pyspark.sql import SparkSession

def join_datasets(clean_foot_perf, clean_foot_value):
    # Create SparkSession if it doesn't exist
    spark = SparkSession.builder.getOrCreate()

    # Create DataFrames from the input data
    clean_foot_perf_df = pd.read_json(clean_foot_perf)
    clean_foot_value_df = pd.read_json(clean_foot_value)

    # Join the DataFrames
    merged_df = pd.merge(clean_foot_perf_df, clean_foot_value_df, on='Player', how='left')

    median_value = merged_df['market_int'].median()
    merged_df['market_int'] = merged_df['market_int'].fillna((median_value/2))
    merged_df['Match played interval'] = merged_df['MP'].apply(interval_match)
    merged_df['Goals interval'] = merged_df['Goals'].apply(interval_goal)
    merged_df['Successful Pass rate (%)'] = merged_df['PassComp%'].apply(interval_pass)
    merged_df['Successful Dribble rate (%)'] = merged_df['DribbleComp%'].apply(interval_dribble)
    merged_df['market_interval (M €)'] = merged_df['market_int'].apply(interval_value)
    merged_df = merged_df.rename(columns={'market_int': 'MarketValue (M €)'})
    merged_df = merged_df.drop_duplicates(["Player"], keep="first")

    # Create a folder named "Combined_result" if it doesn't exist
    folder_path = "Combined_result"
    if not os.path.exists(folder_path):
      os.makedirs(folder_path)

    # Save the DataFrame as a CSV file inside the "Combined_result" folder
    file_path = os.path.join(folder_path, "merged_data.csv")
    merged_df.to_csv(file_path, index=False)

    spark = SparkSession.builder.getOrCreate()
    spark_merged_df = spark.createDataFrame(merged_df)
    spark_merged_df.show(truncate=False)


    return merged_df



In [None]:
join_datasets(perf, value)